Changeset - 026618d6681b
[Not reviewed]
default
0 4 0
Laman - 8 years ago 2017-06-20 14:30:52

refactoring: members changed to private where possible
4 files changed with 45 insertions and 46 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 

	
 

	
 
class Connection:
 
	def __init__(self):
 
		self.socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 
		self.socket.connect((conf.hosts[0], conf.port))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
		self._socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
		self._socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self.filename=filename
 
		self._filename=filename
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self.filename)
 
		localTree=HashTree.fromFile(self._filename)
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection() as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 

	
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
			progress=Progress(localTree.leafCount)
 
			while len(nodeStack)>0:
 
				i=nodeStack.pop()
 
				outcoming.writeMsg({"command":"req", "index":i})
 

	
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["index"]==i
 
				assert jsonData["dataType"]=="hash"
 
				stats.logExchangedNode()
 

	
 
				if localTree.store[i]!=binData:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
		progress.done()
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self.filename,mode="rb")
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		with Connection() as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				outcoming.writeMsg(jsonData,binData)
 
				stats.logTransferredBlock()
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				i1=i2
 
				progress.p(k)
 
		progress.done()
 

	
 
		with Connection() as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 

	
src/hashtree.py
Show inline comments
 
import hashlib
 
import os
 
from datetime import datetime
 

	
 
from util import Progress
 

	
 

	
 
class HashTree:
 
	HASH_LEN=16 # bytes
 
	BLOCK_SIZE=4096 # bytes
 
	
 
	## Prepares a tree containing leafCount leaves.
 
	def __init__(self,leafCount):
 
		self.store=[b""]*(leafCount*2-1)
 
		self.leafStart=leafCount-1
 
		self.index=self.leafStart
 
		self.leafCount=leafCount
 
		self._index=self.leafStart
 
		
 
	@classmethod
 
	def fromFile(cls,filename):
 
		with open(filename,"rb") as f:
 
			stat=os.fstat(f.fileno())
 
			size=stat.st_size # !! symlinks
 
			leafCount=(size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks
 
			res=cls(leafCount)
 
			print(datetime.now(), "hashing file:")
 

	
 
			progress=Progress(leafCount)
 
			for i in range(leafCount):
 
				data=f.read(HashTree.BLOCK_SIZE)
 
				res.insertLeaf(hashlib.sha256(data).digest()[HashTree.HASH_LEN:])
 

	
 
				progress.p(i)
 
			progress.done()
 
		res.buildTree()
 
		
 
		return res
 

	
 
	@classmethod
 
	def load(cls,filename):
 
		with open(filename,"rb") as f:
 
			stat=os.fstat(f.fileno())
 
			size=stat.st_size
 
			nodeCount=size//HashTree.HASH_LEN
 
			res=cls((nodeCount+1)//2)
 

	
 
			for i in range(nodeCount):
 
				res.store[i]=f.read(HashTree.HASH_LEN)
 
		return res
 

	
 
	def save(self,filename):
 
		with open(filename,"wb") as f:
 
			for h in self.store:
 
				f.write(h)
 

	
 
		
 
	## Inserts a leaf at the first empty position.
 
	#	
 
	#
 
	#	Useful and used only during the tree construction.
 
	def insertLeaf(self,h):
 
		self.store[self.index]=h
 
		self.index+=1
 
		self.store[self._index]=h
 
		self._index+=1
 
		
 
	## Updates a hash stored in the leaf.
 
	def updateLeaf(self,index,h):
 
		if index<self.leafStart: raise IndexError()
 
		
 
		self.store[index]=h
 
		self.updateNode((index-1)//2)
 
	
 
	## Updates the node at index and all its ancestors.
 
	def updateNode(self,index):
 
		while index>=0:
 
			self.store[index]=hashlib.sha256(self.store[index*2+1]+self.store[index*2+2]).digest()[HashTree.HASH_LEN:]
 
			index=(index-1)//2
 
			
 
	## Fast construction of the tree over the leaves. O(n).
 
	def buildTree(self):
 
		print(datetime.now(), "building tree:")
 
		progress=Progress(-1, self.leafStart-1)
 
		for i in range(self.leafStart-1,-1,-1):
 
			self.store[i]=hashlib.sha256(self.store[i*2+1]+self.store[i*2+2]).digest()[HashTree.HASH_LEN:]
 
			progress.p(i)
 
		progress.done()
 

	
 

	
src/networkers.py
Show inline comments
 
import json
 

	
 
import stats
 

	
 

	
 
class NetworkReader:
 
	def __init__(self,stream):
 
		self.stream=stream
 
		self._stream=stream
 

	
 
	def readMsg(self):
 
		data=self.stream.readline()
 
		data=self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		jsonLength=int(data.split(b":")[1].strip()) # "json-length: length" -> length
 

	
 
		data=self.stream.readline()
 
		data=self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length
 

	
 
		jsonData=self.stream.read(jsonLength)
 
		jsonData=self._stream.read(jsonLength)
 
		assert len(jsonData)==jsonLength
 
		stats.logReceived(jsonData)
 
		jsonData=json.loads(str(jsonData,encoding="utf-8"))
 

	
 
		binData=self.stream.read(binLength)
 
		binData=self._stream.read(binLength)
 
		assert len(binData)==binLength
 
		stats.logReceived(binData)
 
		
 
		return (jsonData,binData)
 
		
 

	
 
class NetworkWriter:
 
	def __init__(self,stream):
 
		self.stream=stream
 
		self._stream=stream
 

	
 
	def writeMsg(self,*args):
 
		msg=self.prepMsg(*args)
 
		self.stream.write(msg)
 
		self.stream.flush()
 
		self._stream.write(msg)
 
		self._stream.flush()
 
		stats.logSent(msg)
 

	
 
	def prepMsg(self,jsonData,binData=b""):
 
		jsonData=bytes(json.dumps(jsonData)+"\n",encoding="utf-8")
 
		jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8")
 
		binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8")
 

	
 
		return b"".join((jsonLength,binLength,jsonData,binData))
src/server.py
Show inline comments
 
import hashlib
 
import socket
 
import logging as log
 

	
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 
import config as conf
 

	
 

	
 
class Connection:
 
	def __init__(self,serverSocket):
 
		self.socket, address = serverSocket.accept()
 
		self._socket, address = serverSocket.accept()
 
		log.info('Connected by {0}'.format(address))
 
		fr=self.socket.makefile(mode="rb")
 
		fw=self.socket.makefile(mode="wb")
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self.socket.close()
 
		self._socket.close()
 

	
 

	
 
class Server:
 
	def __init__(self,filename,treeFile=""):
 
		self.filename=filename
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self.tree=HashTree.load(treeFile)
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self.tree=HashTree.fromFile(filename)
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self.tree.BLOCK_SIZE
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self.ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self.ss.bind(("",conf.port))
 
		self.ss.listen(1)
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
		self._lastWrite=-1
 
		self.dataFile=None
 
		self._dataFile=None
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self.ss) as (incoming,outcoming):
 
			with Connection(self._ss) as (incoming, outcoming):
 
				try:
 
					while True:
 
						if not self._serveOne(incoming,outcoming): return
 
				except AssertionError:
 
					continue
 

	
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self.tree.leafCount
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			outcoming.writeMsg(*self._requestHash(jsonData))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,jsonData):
 
		log.info("received request for node #{0}".format(jsonData["index"]))
 
		assert jsonData["index"]<len(self.tree.store)
 
		nodeHash=self.tree.store[jsonData["index"]]
 
		assert jsonData["index"]<len(self._tree.store)
 
		nodeHash=self._tree.store[jsonData["index"]]
 

	
 
		jsonResponse={"command":"send", "index":jsonData["index"], "dataType":"hash"}
 
		binResponse=nodeHash
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		if not self.dataFile:
 
			self.dataFile=open(self.filename,mode="rb+")
 
		if not self._dataFile:
 
			self._dataFile=open(self._filename, mode="rb+")
 
		i=jsonData["index"]
 
		if self._lastWrite+1!=i:
 
			self.dataFile.seek(i*self.BLOCK_SIZE)
 
		self.dataFile.write(binData)
 
			self._dataFile.seek(i * self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastWrite=i
 
		if self._treeFile:
 
			self._newLeaves[i+self.tree.leafStart]=hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 
			self._newLeaves[i + self._tree.leafStart]= hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 

	
 
		return ({"command": "ack", "index": i},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		if self.dataFile:
 
			self.dataFile.close()
 
		if self._dataFile:
 
			self._dataFile.close()
 
		if self._treeFile:
 
			log.info("updating hash tree...")
 
			for (k,v) in self._newLeaves.values():
 
				self.tree.updateLeaf(k,v)
 
			self.tree.save(self._treeFile)
 
				self._tree.updateLeaf(k, v)
 
			self._tree.save(self._treeFile)
 
		log.info("done")
0 comments (0 inline, 0 general)