Changeset - dad65188b1a0
[Not reviewed]
default
0 3 0
Laman - 8 years ago 2017-06-05 23:59:32

from transient back to persistent sockets
3 files changed with 44 insertions and 32 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
from util import progress
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 

	
 

	
 
class Connection:
 
	def __init__(self):
 
		self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 
		self.socket.connect((conf.hosts[0], conf.port))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self.filename=filename
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self.filename)
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection() as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		while len(nodeStack)>0:
 
			with Connection() as (incoming,outcoming):
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
			while len(nodeStack)>0:
 
				i=nodeStack.pop()
 
				outcoming.writeMsg({"command":"req", "index":i})
 

	
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["index"]==i
 
				assert jsonData["dataType"]=="hash"
 

	
 
				if localTree.store[i]!=binData:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress(i-localTree.leafStart, localTree.leafCount)
 
		print("100%")
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self.filename,mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			with Connection() as (incoming,outcoming):
 
		with Connection() as (incoming,outcoming):
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				outcoming.writeMsg(jsonData,binData)
 
			i1=i2
 
			progress(k,len(blocksToTransfer))
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				i1=i2
 
				progress(k,len(blocksToTransfer))
 
		print("100%")
 

	
 
		with Connection() as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 

	
 
		log.info(datetime.now(), "closing session...")
 
		log.info("closing session...")
 
		dataFile.close()
src/networkers.py
Show inline comments
 
import json
 

	
 

	
 
class NetworkReader:
 
	def __init__(self,stream):
 
		self.stream=stream
 

	
 
	def readMsg(self):
 
		data=self.stream.readline()
 
		if not data: pass # !! raise something
 
		assert data
 
		jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
 
		data=self.stream.readline()
 
		if not data: pass # !! raise something
 
		assert data
 
		binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
 
		jsonData=self.stream.read(jsonLength)
 
		assert len(jsonData)==jsonLength
 
		jsonData=json.loads(str(jsonData,encoding="utf-8"))
 
		binData=self.stream.read(binLength)
 
		assert len(binData)==binLength
 
		
 
		return (jsonData,binData)
 
		
 

	
 
class NetworkWriter:
 
	def __init__(self,stream):
 
		self.stream=stream
 

	
 
	def writeMsg(self,*args):
 
		self.stream.write(self.prepMsg(*args))
 
		self.stream.flush()
 

	
 
	def prepMsg(self,jsonData,binData=b""):
 
		jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
 
		jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
 
		binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
 

	
 
		return b"".join((jsonLength,binLength,jsonData,binData))
src/server.py
Show inline comments
 
import socket
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 
import logging as log
 

	
 
import config as conf
 

	
 

	
 
class Connection:
 
	def __init__(self,serverSocket):
 
		self.socket, address = serverSocket.accept()
 
		log.info('Connected by {0}'.format(address))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 

	
 

	
 
class Server:
 
	def __init__(self,filename,treeFile=""):
 
		self.filename=filename
 

	
 
		if treeFile:
 
			self.tree=HashTree.load(treeFile)
 
		else:
 
			self.tree=HashTree.fromFile(filename)
 

	
 
		self.BLOCK_SIZE=self.tree.BLOCK_SIZE
 

	
 
		self.ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self.ss.bind(("",conf.port))
 
		self.ss.listen(1)
 

	
 
		self._lastWrite=-1
 
		self.dataFile=None
 

	
 
	def serve(self):
 
		while self._serveOne():
 
			pass
 

	
 
	def _serveOne(self):
 
		with Connection(self.ss) as (incoming,outcoming):
 
			jsonData,binData=incoming.readMsg()
 
		while True:
 
			with Connection(self.ss) as (incoming,outcoming):
 
				try:
 
					while True:
 
						if not self._serveOne(incoming,outcoming): return
 
				except AssertionError:
 
					continue
 

	
 
			if jsonData["command"]=="init":
 
				assert jsonData["blockSize"]==self.BLOCK_SIZE
 
				assert jsonData["blockCount"]==self.tree.leafCount
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 

	
 
			elif jsonData["command"]=="req":
 
				outcoming.writeMsg(*self._requestHash(jsonData))
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self.tree.leafCount
 
			outcoming.writeMsg({"command": "ack"})
 

	
 
			elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
				self._receiveData(jsonData,binData)
 
		elif jsonData["command"]=="req":
 
			outcoming.writeMsg(*self._requestHash(jsonData))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
			elif jsonData["command"]=="end":
 
				log.info("closing session...")
 
				if self.dataFile:
 
					self.dataFile.close()
 
				return False
 
		elif jsonData["command"]=="end":
 
			log.info("closing session...")
 
			if self.dataFile:
 
				self.dataFile.close()
 
			return False
 

	
 
			else:
 
				assert False, jsonData["command"]
 
		else:
 
			assert False, jsonData["command"]
 

	
 
			return True
 
		return True
 

	
 
	def _requestHash(self,jsonData):
 
		log.info("received request for node #{0}".format(jsonData["index"]))
 
		assert jsonData["index"]<len(self.tree.store)
 
		nodeHash=self.tree.store[jsonData["index"]]
 

	
 
		jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"}
 
		binResponse=nodeHash
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		if not self.dataFile:
 
			self.dataFile=open(self.filename,mode="rb+")
 
		i=jsonData["index"]
 
		if self._lastWrite+1!=i:
 
			self.dataFile.seek(i*self.BLOCK_SIZE)
 
		self.dataFile.write(binData)
 
		self._lastWrite=i
 

	
 
		return ({"command": "ack", "index": i},)
 
		# never update the hash tree
0 comments (0 inline, 0 general)