Files
@ 6686770e5fe6
Branch filter:
Location: Morevna/src/networkers.py - annotation
6686770e5fe6
1.0 KiB
text/x-python
simple shell backup script
34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 | import json
class NetworkReader:
def __init__(self,stream):
self.stream=stream
def readMsg(self):
data=self.stream.readline()
assert data
jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
data=self.stream.readline()
assert data
binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
jsonData=self.stream.read(jsonLength)
assert len(jsonData)==jsonLength
jsonData=json.loads(str(jsonData,encoding="utf-8"))
binData=self.stream.read(binLength)
assert len(binData)==binLength
return (jsonData,binData)
class NetworkWriter:
def __init__(self,stream):
self.stream=stream
def writeMsg(self,*args):
self.stream.write(self.prepMsg(*args))
self.stream.flush()
def prepMsg(self,jsonData,binData=b""):
jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
return b"".join((jsonLength,binLength,jsonData,binData))
|