Files
@ 7b737e64c6a0
Branch filter:
Location: Morevna/src/networkers.py - annotation
7b737e64c6a0
1.0 KiB
text/x-python
actually using host and port command line parameters
34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 | import json
class NetworkReader:
def __init__(self,stream):
self.stream=stream
def readMsg(self):
data=self.stream.readline()
assert data
jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
data=self.stream.readline()
assert data
binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
jsonData=self.stream.read(jsonLength)
assert len(jsonData)==jsonLength
jsonData=json.loads(str(jsonData,encoding="utf-8"))
binData=self.stream.read(binLength)
assert len(binData)==binLength
return (jsonData,binData)
class NetworkWriter:
def __init__(self,stream):
self.stream=stream
def writeMsg(self,*args):
self.stream.write(self.prepMsg(*args))
self.stream.flush()
def prepMsg(self,jsonData,binData=b""):
jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
return b"".join((jsonLength,binLength,jsonData,binData))
|