Changeset - 026618d6681b
[Not reviewed]
default
0 4 0
Laman - 8 years ago 2017-06-20 14:30:52

refactoring: members changed to private where possible
4 files changed with 45 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -3,46 +3,46 @@ import socket
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 

	
 

	
 
class Connection:
 
	def __init__(self):
 
		self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 
		self.socket.connect((conf.hosts[0], conf.port))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
		self._socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
		self._socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self.filename=filename
 
		self._filename=filename
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self.filename)
 
		localTree=HashTree.fromFile(self._filename)
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection() as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 

	
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
@@ -60,25 +60,25 @@ class Client:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
		progress.done()
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self.filename,mode="rb")
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		with Connection() as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
src/hashtree.py
Show inline comments
 
@@ -4,26 +4,26 @@ from datetime import datetime
 

	
 
from util import Progress
 

	
 

	
 
class HashTree:
 
	HASH_LEN=16 # bytes
 
	BLOCK_SIZE=4096 # bytes
 
	
 
	## Prepares a tree containing leafCount leaves.
 
	def __init__(self,leafCount):
 
		self.store=[b""]*(leafCount*2-1)
 
		self.leafStart=leafCount-1
 
		self.index=self.leafStart
 
		self.leafCount=leafCount
 
		self._index=self.leafStart
 
		
 
	@classmethod
 
	def fromFile(cls,filename):
 
		with open(filename,"rb") as f:
 
			stat=os.fstat(f.fileno())
 
			size=stat.st_size # !! symlinks
 
			leafCount=(size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks
 
			res=cls(leafCount)
 
			print(datetime.now(), "hashing file:")
 

	
 
			progress=Progress(leafCount)
 
			for i in range(leafCount):
 
@@ -43,32 +43,31 @@ class HashTree:
 
			size=stat.st_size
 
			nodeCount=size//HashTree.HASH_LEN
 
			res=cls((nodeCount+1)//2)
 

	
 
			for i in range(nodeCount):
 
				res.store[i]=f.read(HashTree.HASH_LEN)
 
		return res
 

	
 
	def save(self,filename):
 
		with open(filename,"wb") as f:
 
			for h in self.store:
 
				f.write(h)
 

	
 
		
 
	## Inserts a leaf at the first empty position.
 
	#	
 
	#
 
	#	Useful and used only during the tree construction.
 
	def insertLeaf(self,h):
 
		self.store[self.index]=h
 
		self.index+=1
 
		self.store[self._index]=h
 
		self._index+=1
 
		
 
	## Updates a hash stored in the leaf.
 
	def updateLeaf(self,index,h):
 
		if index<self.leafStart: raise IndexError()
 
		
 
		self.store[index]=h
 
		self.updateNode((index-1)//2)
 
	
 
	## Updates the node at index and all its ancestors.
 
	def updateNode(self,index):
 
		while index>=0:
 
			self.store[index]=hashlib.sha256(self.store[index*2+1]+self.store[index*2+2]).digest()[HashTree.HASH_LEN:]
src/networkers.py
Show inline comments
 
import json
 

	
 
import stats
 

	
 

	
 
class NetworkReader:
 
	def __init__(self,stream):
 
		self.stream=stream
 
		self._stream=stream
 

	
 
	def readMsg(self):
 
		data=self.stream.readline()
 
		data=self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
 

	
 
		data=self.stream.readline()
 
		data=self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
 

	
 
		jsonData=self.stream.read(jsonLength)
 
		jsonData=self._stream.read(jsonLength)
 
		assert len(jsonData)==jsonLength
 
		stats.logReceived(jsonData)
 
		jsonData=json.loads(str(jsonData,encoding="utf-8"))
 

	
 
		binData=self.stream.read(binLength)
 
		binData=self._stream.read(binLength)
 
		assert len(binData)==binLength
 
		stats.logReceived(binData)
 
		
 
		return (jsonData,binData)
 
		
 

	
 
class NetworkWriter:
 
	def __init__(self,stream):
 
		self.stream=stream
 
		self._stream=stream
 

	
 
	def writeMsg(self,*args):
 
		msg=self.prepMsg(*args)
 
		self.stream.write(msg)
 
		self.stream.flush()
 
		self._stream.write(msg)
 
		self._stream.flush()
 
		stats.logSent(msg)
 

	
 
	def prepMsg(self,jsonData,binData=b""):
 
		jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
 
		jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
 
		binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
 

	
 
		return b"".join((jsonLength,binLength,jsonData,binData))
src/server.py
Show inline comments
 
import hashlib
 
import socket
 
import logging as log
 

	
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 
import config as conf
 

	
 

	
 
class Connection:
 
	def __init__(self,serverSocket):
 
		self.socket, address = serverSocket.accept()
 
		self._socket, address = serverSocket.accept()
 
		log.info('Connected by {0}'.format(address))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
		self._socket.close()
 

	
 

	
 
class Server:
 
	def __init__(self,filename,treeFile=""):
 
		self.filename=filename
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self.tree=HashTree.load(treeFile)
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self.tree=HashTree.fromFile(filename)
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self.tree.BLOCK_SIZE
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self.ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self.ss.bind(("",conf.port))
 
		self.ss.listen(1)
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
		self._lastWrite=-1
 
		self.dataFile=None
 
		self._dataFile=None
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self.ss) as (incoming,outcoming):
 
			with Connection(self._ss) as (incoming, outcoming):
 
				try:
 
					while True:
 
						if not self._serveOne(incoming,outcoming): return
 
				except AssertionError:
 
					continue
 

	
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self.tree.leafCount
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			outcoming.writeMsg(*self._requestHash(jsonData))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,jsonData):
 
		log.info("received request for node #{0}".format(jsonData["index"]))
 
		assert jsonData["index"]<len(self.tree.store)
 
		nodeHash=self.tree.store[jsonData["index"]]
 
		assert jsonData["index"]<len(self._tree.store)
 
		nodeHash=self._tree.store[jsonData["index"]]
 

	
 
		jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"}
 
		binResponse=nodeHash
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		if not self.dataFile:
 
			self.dataFile=open(self.filename,mode="rb+")
 
		if not self._dataFile:
 
			self._dataFile=open(self._filename, mode="rb+")
 
		i=jsonData["index"]
 
		if self._lastWrite+1!=i:
 
			self.dataFile.seek(i*self.BLOCK_SIZE)
 
		self.dataFile.write(binData)
 
			self._dataFile.seek(i * self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastWrite=i
 
		if self._treeFile:
 
			self._newLeaves[i+self.tree.leafStart]=hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 
			self._newLeaves[i + self._tree.leafStart]= hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 

	
 
		return ({"command": "ack", "index": i},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		if self.dataFile:
 
			self.dataFile.close()
 
		if self._dataFile:
 
			self._dataFile.close()
 
		if self._treeFile:
 
			log.info("updating hash tree...")
 
			for (k,v) in self._newLeaves.values():
 
				self.tree.updateLeaf(k,v)
 
			self.tree.save(self._treeFile)
 
				self._tree.updateLeaf(k, v)
 
			self._tree.save(self._treeFile)
 
		log.info("done")
0 comments (0 inline, 0 general)