Files
@ f6a35be79a81
Branch filter:
Location: Morevna/src/networkers.py - annotation
f6a35be79a81
1.0 KiB
text/x-python
removed safety copy of the served file
34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 | import json
class NetworkReader:
def __init__(self,stream):
self.stream=stream
def readMsg(self):
data=self.stream.readline()
assert data
jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
data=self.stream.readline()
assert data
binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
jsonData=self.stream.read(jsonLength)
assert len(jsonData)==jsonLength
jsonData=json.loads(str(jsonData,encoding="utf-8"))
binData=self.stream.read(binLength)
assert len(binData)==binLength
return (jsonData,binData)
class NetworkWriter:
def __init__(self,stream):
self.stream=stream
def writeMsg(self,*args):
self.stream.write(self.prepMsg(*args))
self.stream.flush()
def prepMsg(self,jsonData,binData=b""):
jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
return b"".join((jsonLength,binLength,jsonData,binData))
|