Changeset - 2e5828ec7d49
[Not reviewed]
default
0 1 0
Laman - 7 years ago 2017-10-19 10:15:32

fixed order of negotiation to a "batch DFS"
1 file changed with 10 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -23,87 +23,93 @@ class Connection:
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self._incoming=None
 
		self._outcoming=None
 
		self._filename=filename
 

	
 
		print(datetime.now(), "initializing...")
 
		self._localTree=HashTree.fromFile(self._filename)
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._localTree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack"
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			indices=[]
 
			for i in range(256):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			indices.sort()
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode(len(indices))
 

	
 
			frontier=[]
 
			for (j,i) in enumerate(indices):
 
				(j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		return sorted(blocksToTransfer)
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
0 comments (0 inline, 0 general)