Changeset - 1c3d9df86fb1
[Not reviewed]
default
0 3 0
Laman - 8 years ago 2017-05-07 00:02:04

switched from persistent to transient sockets
3 files changed with 113 insertions and 100 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -10,79 +10,85 @@ from networkers import NetworkReader,Net
 
filename=sys.argv[1]
 
 
 
def connect():
 
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
	s.connect((conf.hosts[0], conf.port))
 
	fr=s.makefile(mode='rb')
 
	fw=s.makefile(mode='wb')
 
class Connection:
 
	def __init__(self):
 
		self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 
		self.socket.connect((conf.hosts[0], conf.port))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
 
	networkReader=NetworkReader(fr)
 
	networkReader.start()
 
	networkWriter=NetworkWriter(fw)
 
	networkWriter.start()
 
		networkReader=NetworkReader(fr)
 
		networkReader.start()
 
		networkWriter=NetworkWriter(fw)
 
		networkWriter.start()
 
 
	incoming=networkReader.output # synchronized message queue
 
	outcoming=networkWriter.input
 
		self.incoming=networkReader.output # synchronized message queue
 
		self.outcoming=networkWriter.input
 
 
	return (s,incoming,outcoming)
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
 
 
def negotiate(incoming,outcoming):
 
def negotiate():
 
	localTree=HashTree.fromFile(open(filename,mode="rb"))
 
	blocksToTransfer=[]
 
	nodeStack=collections.deque([0]) # root
 
 
	# initialize session
 
	jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
	outcoming.put((jsonData,b""))
 
	with Connection() as (incoming,outcoming):
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		outcoming.put((jsonData,b""))
 
 
	# determine which blocks to send
 
	while len(nodeStack)>0:
 
		i=nodeStack.pop()
 
		jsonData={"command":"req", "index":i}
 
		outcoming.put((jsonData,b""))
 
		with Connection() as (incoming,outcoming):
 
			i=nodeStack.pop()
 
			jsonData={"command":"req", "index":i}
 
			outcoming.put((jsonData,b""))
 
 
		jsonData,binData=incoming.get(timeout=2)
 
		assert jsonData["index"]==i
 
		assert jsonData["dataType"]=="hash"
 
			jsonData,binData=incoming.get(timeout=2)
 
			assert jsonData["index"]==i
 
			assert jsonData["dataType"]=="hash"
 
 
		if localTree.store[i]!=binData:
 
			if 2*i+3<len(localTree.store): # inner node
 
				nodeStack.append(2*i+2)
 
				nodeStack.append(2*i+1)
 
			else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
			if localTree.store[i]!=binData:
 
				if 2*i+3<len(localTree.store): # inner node
 
					nodeStack.append(2*i+2)
 
					nodeStack.append(2*i+1)
 
				else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
 
	return blocksToTransfer
 
 
 
def sendData(outcoming,blocksToTransfer):
 
def sendData(blocksToTransfer):
 
	print(blocksToTransfer)
 
	dataFile=open(filename,mode="rb")
 
	i1=-1
 
 
	for i2 in blocksToTransfer:
 
		jsonData={"command":"send", "index":i2, "dataType":"data"}
 
		if i1+1!=i2:
 
			dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
		binData=dataFile.read(HashTree.BLOCK_SIZE)
 
		with Connection() as (incoming,outcoming):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 
 
		print("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
			print("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
 
		outcoming.put((jsonData,binData),timeout=2)
 
		i1=i2
 
			outcoming.put((jsonData,binData),timeout=2)
 
			i1=i2
 
 
	jsonData={"command":"end"}
 
	outcoming.put((jsonData,b""),timeout=2)
 
	with Connection() as (incoming,outcoming):
 
		jsonData={"command":"end"}
 
		outcoming.put((jsonData,b""),timeout=2)
 
 
	outcoming.put(None)
 
	print("closing...")
 
	dataFile.close()
 
 
 
if __name__=="__main__":
 
	sock,incoming,outcoming = connect()
 
	blocksToTransfer=negotiate(incoming,outcoming)
 
	sendData(outcoming,blocksToTransfer)
 
	blocksToTransfer=negotiate()
 
	sendData(blocksToTransfer)
 
 
	sock.close()
 
	sys.exit(0)
src/networkers.py
Show inline comments
 
@@ -12,9 +12,8 @@ class NetworkReader(threading.Thread):
 
		self.output=queue.Queue()
 
		
 
	def run(self):
 
		while True:
 
			if not self.parent.is_alive(): return
 
			self.output.put(self.readMsg(),timeout=2)
 
		if not self.parent.is_alive(): return
 
		self.output.put(self.readMsg(),timeout=5)
 
			
 
	def readMsg(self):
 
		data=self.stream.readline()
 
@@ -39,14 +38,13 @@ class NetworkWriter(threading.Thread):
 
		self.input=queue.Queue()
 
		
 
	def run(self):
 
		while True:
 
			if not self.parent.is_alive(): return
 
			msg=self.input.get(timeout=2)
 
			if msg is None: return
 
			self.stream.write(self.writeMsg(msg))
 
			self.stream.flush()
 
		if not self.parent.is_alive(): return
 
		msg=self.input.get(timeout=5)
 
		if msg is None: return
 
		self.stream.write(self.prepMsg(msg))
 
		self.stream.flush()
 
			
 
	def writeMsg(self,msg):
 
	def prepMsg(self, msg):
 
		jsonData,binData=msg
 
		jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
 
		jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
src/server.py
Show inline comments
 
@@ -4,6 +4,8 @@ from networkers import NetworkReader,Net
 
import collections
 
import sys
 
 
import config as conf
 
 
# debug copy default file
 
import shutil
 
origFilename=sys.argv[1]
 
@@ -11,67 +13,74 @@ filename=origFilename+"_"
 
shutil.copyfile(origFilename,filename)
 
 
 
class Connection:
 
	def __init__(self,server_socket):
 
		self.socket, address = server_socket.accept()
 
		print('Connected by', address)
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
 
		networkReader=NetworkReader(fr)
 
		networkReader.start()
 
		networkWriter=NetworkWriter(fw)
 
		networkWriter.start()
 
 
		self.incoming=networkReader.output # synchronized message queue
 
		self.outcoming=networkWriter.input
 
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
 
 
localTree=HashTree.fromFile(open(filename,mode="rb"))
 
 
HOST = ''								 # Symbolic name meaning all available interfaces
 
PORT = 50009							# Arbitrary non-privileged port
 
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
s.bind((HOST, PORT))
 
s.listen(1)
 
conn, addr = s.accept()
 
print('Connected by', addr)
 
fr=conn.makefile(mode="rb")
 
fw=conn.makefile(mode="wb")
 
 
networkReader=NetworkReader(fr)
 
networkReader.start()
 
networkWriter=NetworkWriter(fw)
 
networkWriter.start()
 
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
ss.bind(("",conf.port))
 
ss.listen(1)
 
 
blocksToTransfer=[]
 
nodeStack=collections.deque([0])
 
incoming=networkReader.output # synchronized message queue
 
outcoming=networkWriter.input
 
 
i1=-1
 
 
 
while True:
 
	jsonData,binData=incoming.get(timeout=2)
 
	dataFile=open(filename,mode="rb+")
 
	
 
	if jsonData["command"]=="init":
 
		assert jsonData["blockSize"]==localTree.BLOCK_SIZE
 
		assert jsonData["blockCount"]==localTree.leafCount
 
		
 
	elif jsonData["command"]=="req": # !! index out of range
 
		print("received request for node #{0}".format(jsonData["index"]))
 
		nodeHash=localTree.store[jsonData["index"]]
 
		
 
		jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"}
 
		binResponse=nodeHash
 
		
 
		outcoming.put((jsonResponse,binResponse),timeout=2)
 
	
 
	elif jsonData["command"]=="send" and jsonData["dataType"]=="data": # needlessly allow hashes and data in mixed order
 
		print("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 
	with Connection(ss) as (incoming,outcoming):
 
		jsonData,binData=incoming.get(timeout=2)
 
		dataFile=open(filename,mode="rb+")
 
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==localTree.BLOCK_SIZE
 
			assert jsonData["blockCount"]==localTree.leafCount
 
 
		elif jsonData["command"]=="req": # !! index out of range
 
			print("received request for node #{0}".format(jsonData["index"]))
 
			nodeHash=localTree.store[jsonData["index"]]
 
 
			jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"}
 
			binResponse=nodeHash
 
 
			outcoming.put((jsonResponse,binResponse),timeout=2)
 
 
		i2=jsonData["index"]
 
		if i1+1!=i2:
 
			dataFile.seek(i2*localTree.BLOCK_SIZE)
 
		dataFile.write(binData)
 
		i1=i2
 
		
 
		# never update the hash tree
 
		
 
	elif jsonData["command"]=="end":
 
		print("closing...")
 
		break
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data": # needlessly allow hashes and data in mixed order
 
			print("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 
 
			i2=jsonData["index"]
 
			if i1+1!=i2:
 
				dataFile.seek(i2*localTree.BLOCK_SIZE)
 
			dataFile.write(binData)
 
			i1=i2
 
 
			# never update the hash tree
 
 
		elif jsonData["command"]=="end":
 
			print("closing...")
 
			break
 
	
 
	else: pass # !! error
 
		else: pass # !! error
 
 
# fr.close()
 
# fw.close()
 
dataFile.close()
 
conn.close()
 
sys.exit(0)
0 comments (0 inline, 0 general)