diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -17,13 +17,8 @@ class Connection: fr=self.socket.makefile(mode="rb") fw=self.socket.makefile(mode="wb") - networkReader=NetworkReader(fr) - networkReader.start() - networkWriter=NetworkWriter(fw) - networkWriter.start() - - self.incoming=networkReader.output # synchronized message queue - self.outcoming=networkWriter.input + self.incoming=NetworkReader(fr) + self.outcoming=NetworkWriter(fw) def __enter__(self): return self.incoming,self.outcoming @@ -40,16 +35,15 @@ def negotiate(): # initialize session with Connection() as (incoming,outcoming): jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version} - outcoming.put((jsonData,b"")) + outcoming.writeMsg(jsonData) # determine which blocks to send while len(nodeStack)>0: with Connection() as (incoming,outcoming): i=nodeStack.pop() - jsonData={"command":"req", "index":i} - outcoming.put((jsonData,b"")) + outcoming.writeMsg({"command":"req", "index":i}) - jsonData,binData=incoming.get(timeout=2) + jsonData,binData=incoming.readMsg() assert jsonData["index"]==i assert jsonData["dataType"]=="hash" @@ -76,12 +70,11 @@ def sendData(blocksToTransfer): print("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:])) - outcoming.put((jsonData,binData),timeout=2) + outcoming.writeMsg(jsonData,binData) i1=i2 with Connection() as (incoming,outcoming): - jsonData={"command":"end"} - outcoming.put((jsonData,b""),timeout=2) + outcoming.writeMsg({"command":"end"}) print("closing...") dataFile.close() diff --git a/src/networkers.py b/src/networkers.py --- a/src/networkers.py +++ b/src/networkers.py @@ -1,20 +1,10 @@ -import threading -import queue -import json +import json -class NetworkReader(threading.Thread): +class NetworkReader: def __init__(self,stream): - threading.Thread.__init__(self,daemon=True) - self.parent=threading.current_thread() - self.stream=stream - self.output=queue.Queue() - - def run(self): - if not self.parent.is_alive(): return - self.output.put(self.readMsg(),timeout=5) - + def readMsg(self): data=self.stream.readline() if not data: pass # !! raise something @@ -29,23 +19,15 @@ class NetworkReader(threading.Thread): return (jsonData,binData) -class NetworkWriter(threading.Thread): +class NetworkWriter: def __init__(self,stream): - threading.Thread.__init__(self,daemon=True) - self.parent=threading.current_thread() - self.stream=stream - self.input=queue.Queue() - - def run(self): - if not self.parent.is_alive(): return - msg=self.input.get(timeout=5) - if msg is None: return - self.stream.write(self.prepMsg(msg)) + + def writeMsg(self,*args): + self.stream.write(self.prepMsg(*args)) self.stream.flush() - - def prepMsg(self, msg): - jsonData,binData=msg + + def prepMsg(self,jsonData,binData=b""): jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8") jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8") binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8") diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -20,13 +20,8 @@ class Connection: fr=self.socket.makefile(mode="rb") fw=self.socket.makefile(mode="wb") - networkReader=NetworkReader(fr) - networkReader.start() - networkWriter=NetworkWriter(fw) - networkWriter.start() - - self.incoming=networkReader.output # synchronized message queue - self.outcoming=networkWriter.input + self.incoming=NetworkReader(fr) + self.outcoming=NetworkWriter(fw) def __enter__(self): return self.incoming,self.outcoming @@ -49,7 +44,7 @@ i1=-1 while True: with Connection(ss) as (incoming,outcoming): - jsonData,binData=incoming.get(timeout=2) + jsonData,binData=incoming.readMsg() dataFile=open(filename,mode="rb+") if jsonData["command"]=="init": @@ -63,7 +58,7 @@ while True: jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"} binResponse=nodeHash - outcoming.put((jsonResponse,binResponse),timeout=2) + outcoming.writeMsg(jsonResponse,binResponse) elif jsonData["command"]=="send" and jsonData["dataType"]=="data": # needlessly allow hashes and data in mixed order print("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))