# HG changeset patch # User Laman # Date 2015-09-30 20:31:52 # Node ID 1931061dd2e17ef88726f7e17d3c9dcdc4a1cd6b init commit diff --git a/.hgignore b/.hgignore new file mode 100644 --- /dev/null +++ b/.hgignore @@ -0,0 +1,1 @@ +^src/__pycache__/ \ No newline at end of file diff --git a/client.py b/client.py new file mode 100644 --- /dev/null +++ b/client.py @@ -0,0 +1,67 @@ +from hashtree import HashTree +import collections +from networkers import NetworkReader,NetworkWriter +import socket +import sys + + +localTree=HashTree.fromFile(open("clientFile.txt",mode="rb")) + +HOST = '127.0.0.1' # The remote host +PORT = 50009 # The same port as used by the server +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.connect((HOST, PORT)) +fr=s.makefile(mode='rb') +fw=s.makefile(mode='wb') + +networkReader=NetworkReader(fr) +networkReader.start() +networkWriter=NetworkWriter(fw) +networkWriter.start() + +blocksToTransfer=[] +nodeStack=collections.deque([0]) # root +incoming=networkReader.output # synchronized message queue +outcoming=networkWriter.input + + +# determine which blocks to send +while len(nodeStack)>0: + i=nodeStack.pop() + jsonData={"command":"req", "index":i} + outcoming.put((jsonData,b"")) + + jsonData,binData=incoming.get(timeout=2) + assert jsonData["index"]==i + assert jsonData["dataType"]=="hash" + + if localTree.store[i]!=binData: + if 2*i+3=0: + self.store[index]=hashlib.sha256(self.store[index*2+1]+self.store[index*2+2]).digest()[HashTree.HASH_LEN:] + index=(index-1)//2 + + ## Fast construction of the tree over the leaves. O(n). + def buildTree(self): + for i in range(self.leafStart-1,-1,-1): + self.store[i]=hashlib.sha256(self.store[i*2+1]+self.store[i*2+2]).digest()[HashTree.HASH_LEN:] + + +if __name__=="__main__": + f1=HashTree.fromFile(open("serverFile.txt",mode='rb')) + f2=HashTree.fromFile(open("clientFile.txt",mode='rb')) + + for i,(h1,h2) in enumerate(zip(f1.store,f2.store)): + print("{0:2}".format(i),h1.hex(),h2.hex(),h1==h2) diff --git a/networkers.py b/networkers.py new file mode 100644 --- /dev/null +++ b/networkers.py @@ -0,0 +1,54 @@ +import threading +import queue +import json + + +class NetworkReader(threading.Thread): + def __init__(self,stream): + threading.Thread.__init__(self,daemon=True) + self.parent=threading.current_thread() + + self.stream=stream + self.output=queue.Queue() + + def run(self): + while True: + if not self.parent.is_alive(): return + self.output.put(self.readMsg(),timeout=2) + + def readMsg(self): + data=self.stream.readline() + if not data: pass # !! raise something + jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length + data=self.stream.readline() + if not data: pass # !! raise something + binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length + jsonData=self.stream.read(jsonLength) + jsonData=json.loads(str(jsonData,encoding="utf-8")) + binData=self.stream.read(binLength) + + return (jsonData,binData) + + +class NetworkWriter(threading.Thread): + def __init__(self,stream): + threading.Thread.__init__(self,daemon=True) + self.parent=threading.current_thread() + + self.stream=stream + self.input=queue.Queue() + + def run(self): + while True: + if not self.parent.is_alive(): return + msg=self.input.get(timeout=2) + if msg is None: return + self.stream.write(self.writeMsg(msg)) + self.stream.flush() + + def writeMsg(self,msg): + jsonData,binData=msg + jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8") + jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8") + binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8") + return b"".join((jsonLength,binLength,jsonData,binData)) diff --git a/server.py b/server.py new file mode 100644 --- /dev/null +++ b/server.py @@ -0,0 +1,67 @@ +import socket +from hashtree import HashTree +import queue +from networkers import NetworkReader,NetworkWriter +import collections +import sys + +# debug copy default file +import shutil +shutil.copyfile("serverFile_.txt","serverFile.txt") + + +localTree=HashTree.fromFile(open("serverFile.txt",mode="rb")) + +HOST = '' # Symbolic name meaning all available interfaces +PORT = 50009 # Arbitrary non-privileged port +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +s.bind((HOST, PORT)) +s.listen(1) +conn, addr = s.accept() +print('Connected by', addr) +fr=conn.makefile(mode="rb") +fw=conn.makefile(mode="wb") + +networkReader=NetworkReader(fr) +networkReader.start() +networkWriter=NetworkWriter(fw) +networkWriter.start() + +blocksToTransfer=[] +nodeStack=collections.deque([0]) +incoming=networkReader.output # synchronized message queue +outcoming=networkWriter.input + + +while True: + jsonData,binData=incoming.get(timeout=2) + + if jsonData["command"]=="req": # !! index out of range + print("received request for node #{0}".format(jsonData["index"])) + nodeHash=localTree.store[jsonData["index"]] + + jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"} + binResponse=nodeHash + + outcoming.put((jsonResponse,binResponse),timeout=2) + + elif jsonData["command"]=="send" and jsonData["dataType"]=="data": # needlessly allow hashes and data in mixed order + print("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:])) + + dataFile=open("serverFile.txt",mode="rb+") + dataFile.seek(jsonData["index"]*localTree.BLOCK_SIZE) + dataFile.write(binData) + dataFile.close() + + # never update the hash tree + + elif jsonData["command"]=="end": + print("closing...") + break + + else: pass # !! error + +# fr.close() +# fw.close() +conn.close() +sys.exit(0)