Changeset - 5813971dbecc
[Not reviewed]
default
0 3 0
Laman - 7 years ago 2017-10-19 00:05:32

sending hashes in batches
3 files changed with 26 insertions and 20 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -38,66 +38,72 @@ class Connection:
 
class Client:
 
	def __init__(self,filename):
 
		self._incoming=None
 
		self._outcoming=None
 
		self._filename=filename
 

	
 
		print(datetime.now(), "initializing...")
 
		self._localTree=HashTree.fromFile(self._filename)
 

	
 
	def negotiate(self):
 
		localTree=self._localTree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack"
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			i=nodeStack.pop()
 
			self._outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 
			indices=[]
 
			for i in range(256):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			indices.sort()
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==i
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode()
 
			stats.logExchangedNode(len(indices))
 

	
 
			if localTree.store[i]!=binData:
 
				if 2*i+3<len(localTree.store): # inner node
 
					nodeStack.append(2*i+2)
 
					nodeStack.append(2*i+1)
 
				else:
 
					blocksToTransfer.append(i-localTree.leafStart) # leaf
 
					progress.p(i-localTree.leafStart)
 
			for (j,i) in enumerate(indices):
 
				(j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
		progress.done()
 

	
 
		return blocksToTransfer
 
		return sorted(blocksToTransfer)
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
src/server.py
Show inline comments
 
@@ -90,55 +90,55 @@ class Server:
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			self._locked=False
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,index):
 
		log.info("received request for node #{0}".format(index))
 
		assert index<len(self._tree.store)
 
		nodeHash=self._tree.store[index]
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"hash"}
 
		binResponse=nodeHash
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		i=jsonData["index"]
 
		if self._lastIndex+1!=i:
 
			self._dataFile.seek(i*self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 

	
src/stats.py
Show inline comments
 
class Stats:
 
	received=0
 
	sent=0
 
	exchangedNodes=0
 
	transferredBlocks=0
 

	
 

	
 
def logReceived(data):
 
	Stats.received+=len(data)
 

	
 

	
 
def logSent(data):
 
	Stats.sent+=len(data)
 

	
 

	
 
def logExchangedNode():
 
	Stats.exchangedNodes+=1
 
def logExchangedNode(k=1):
 
	Stats.exchangedNodes+=k
 

	
 

	
 
def logTransferredBlock():
 
	Stats.transferredBlocks+=1
 

	
 

	
 
def report():
 
	return """received {r}B
 
sent {s}B
 
exchanged {nodes} hash tree nodes
 
transferred {blocks} blocks""".format(r=Stats.received, s=Stats.sent, nodes=Stats.exchangedNodes, blocks=Stats.transferredBlocks)
0 comments (0 inline, 0 general)