Changeset - bacd3b2d37aa
[Not reviewed]
default
0 4 1
Laman - 7 years ago 2017-11-01 23:50:10

optimized hashtree batch update
5 files changed with 68 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/hashtree.py
Show inline comments
 
import hashlib
 
import collections
 
import hashlib
 
import os
 
from datetime import datetime
 

	
 
from util import Progress
 

	
 

	
 
def hashBlock(data):
 
	return hashlib.sha256(data).digest()[-HashTree.HASH_LEN:]
 

	
 

	
 
class HashTree:
 
	HASH_LEN=16 # bytes
 
	BLOCK_SIZE=4096 # bytes
 
	
 
	## Prepares a tree containing leafCount leaves.
 
	def __init__(self,leafCount):
 
		self.store=[b""]*(leafCount*2-1)
 
		self.leafStart=leafCount-1
 
		self.leafCount=leafCount
 
		self._index=self.leafStart
 
		
 
	@classmethod
 
	def fromFile(cls,filename):
 
		with open(filename,"rb") as f:
 
			stat=os.fstat(f.fileno())
 
			size=stat.st_size # !! symlinks
 
			leafCount=(size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks
 
			res=cls(leafCount)
 
			print(datetime.now(), "hashing file:")
 

	
 
			progress=Progress(leafCount)
 
			for i in range(leafCount):
 
				data=f.read(HashTree.BLOCK_SIZE)
 
				res.insertLeaf(hashBlock(data))
 

	
 
				progress.p(i)
 
			progress.done()
 
		res.buildTree()
 
		
 
		return res
 

	
 
	@classmethod
 
	def load(cls,filename):
 
		with open(filename,"rb") as f:
 
			stat=os.fstat(f.fileno())
 
			size=stat.st_size
 
			nodeCount=size//HashTree.HASH_LEN
 
			res=cls((nodeCount+1)//2)
 

	
 
			for i in range(nodeCount):
 
				res.store[i]=f.read(HashTree.HASH_LEN)
 
		return res
 

	
 
	def save(self,filename):
 
		with open(filename,"wb") as f:
 
			for h in self.store:
 
				f.write(h)
 
		
 
	## Inserts a leaf at the first empty position.
 
	#
 
	#	Useful and used only during the tree construction.
 
	def insertLeaf(self,h):
 
		self.store[self._index]=h
 
		self._index+=1
 
		
 
	## Updates a hash stored in the leaf.
 
	def updateLeaf(self,index,h):
 
		if index<self.leafStart: raise IndexError()
 
		
 
		self.store[index]=h
 
		self.updateNode((index-1)//2)
 
	
 
	## Updates the node at index and all its ancestors.
 
	def updateNode(self,index):
 
		while index>=0:
 
			self.store[index]=hashBlock(self.store[index*2+1]+self.store[index*2+2])
 
			index=(index-1)//2
 
			
 
	## Fast construction of the tree over the leaves. O(n).
 
	def buildTree(self):
 
		print(datetime.now(), "building tree:")
 
		progress=Progress(-1, self.leafStart-1)
 
		for i in range(self.leafStart-1,-1,-1):
 
			self.store[i]=hashBlock(self.store[i*2+1]+self.store[i*2+2])
 
			progress.p(i)
 
		progress.done()
 

	
 
	## Update faster than repeated insertLeaf.
 
	def batchUpdate(self,keysHashes):
 
		queue=collections.deque()
 
		for (k,v) in sorted(keysHashes):
 
			self.store[k]=v
 
			parentK=(k-1)//2
 
			if len(queue)==0 or queue[-1]!=parentK:
 
				queue.append(parentK)
 

	
 
		while len(queue)>0:
 
			k=queue.pop()
 
			self.store[k]=hashBlock(self.store[k*2+1]+self.store[k*2+2])
 
			parentK=(k-1)//2
 
			if (len(queue)==0 or queue[0]!=parentK) and k!=0:
 
				queue.appendleft(parentK)
src/netnode.py
Show inline comments
 
@@ -4,51 +4,50 @@ import logging as log
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
class FailedConnection(Exception): pass
 

	
 

	
 
class BaseConnection: # abstract
 
	def __init__(self):
 
		self._socket=None
 
		self.incoming=None
 
		self.outcoming=None
 

	
 
	def createNetworkers(self):
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		try:
 
			self._socket.shutdown(socket.SHUT_RDWR)
 
			self._socket.close()
 
		except OSError:
 
			log.warning("broken connection")
 

	
 

	
 
class NetNode:
 
	def __init__(self,filename,treeFile=""):
 
		self._incoming=None
 
		self._outcoming=None
 

	
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 

	
 
	def _updateTree(self):
 
		log.info("updating hash tree...")
 
		for (k,v) in self._newLeaves.items():
 
			self._tree.updateLeaf(k, v)
 
		self._tree.batchUpdate(self._newLeaves.items())
 
		self._tree.save(self._treeFile)
src/tests/__init__.py
Show inline comments
 
import sys
 

	
 

	
 
class RedirectedOutput():
 
	_stdout=None
 

	
 
	@classmethod
 
	def setUpClass(cls):
 
		cls._stdout=sys.stdout
 
		sys.stdout=open("/tmp/morevna-stdout.log",mode="a")
 

	
 
	@classmethod
 
	def tearDownClass(cls):
 
		sys.stdout.close()
 
		sys.stdout=cls._stdout
src/tests/test_hashtree.py
Show inline comments
 
new file 100644
 
import random
 
from unittest import TestCase
 

	
 
from hashtree import HashTree
 
from . import RedirectedOutput
 

	
 

	
 
random.seed(17)
 

	
 

	
 
def buildTree(leaves):
 
	tree=HashTree(len(leaves))
 
	for l in leaves:
 
		tree.insertLeaf(l)
 
	tree.buildTree()
 
	return tree
 

	
 

	
 
class TestMorevna(RedirectedOutput,TestCase):
 
	def test_batchUpdate(self):
 
		leaves=[b"a" for i in range(8)]
 
		t1=buildTree(leaves)
 
		keys=list(range(8))
 

	
 
		for i in range(8):
 
			random.shuffle(keys)
 
			for k in keys[:i+1]:
 
				leaves[k]=bytes([random.randrange(256)])
 
			t2=buildTree(leaves)
 
			t1.batchUpdate((k+t1.leafStart,leaves[k]) for k in keys[:i+1])
 
			self.assertEqual(t1.store,t2.store)
src/tests/test_overall.py
Show inline comments
 
import os
 
import sys
 
import shutil
 
import hashlib
 
import multiprocessing
 
from logging import FileHandler
 
from unittest import TestCase
 

	
 
import config
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection
 
from server import Miniserver
 
from . import RedirectedOutput
 

	
 

	
 
config.logger.removeHandler(config.handler)
 
handler=FileHandler("/tmp/morevna.log")
 
handler.setFormatter(config.formatter)
 
config.logger.addHandler(handler)
 

	
 
config.batchSize=8
 

	
 
dataDir=os.path.join(config.directory,"src/tests/data")
 
filename=os.path.join(dataDir,"test.img")
 

	
 

	
 
def compareFiles(f1,f2):
 
	with open(f1,mode="rb") as f:
 
		h2=hashlib.sha256(f.read()).hexdigest()
 
	with open(f2,mode="rb") as f:
 
		h=hashlib.sha256(f.read()).hexdigest()
 
	return (h,h2)
 

	
 

	
 
class TestMorevna(TestCase):
 
class TestMorevna(RedirectedOutput,TestCase):
 
	_stdout=None
 

	
 
	@classmethod
 
	def setUpClass(cls):
 
		cls._stdout=sys.stdout
 
		sys.stdout=open("/tmp/morevna-stdout.log",mode="a")
 

	
 
	def setUp(self):
 
		src=os.path.join(dataDir,"test1.img")
 
		shutil.copyfile(src,filename)
 

	
 
	@classmethod
 
	def tearDownClass(cls):
 
		super().tearDownClass()
 
		os.remove(filename)
 
		sys.stdout.close()
 
		sys.stdout=cls._stdout
 

	
 
	def test_build(self):
 
		treeFile=os.path.join(dataDir,"test.bin")
 
		refFile=os.path.join(dataDir,"test1.bin")
 

	
 
		tree=HashTree.fromFile(os.path.join(dataDir,"test1.img"))
 
		tree.save(treeFile)
 

	
 
		self.assertEqual(*compareFiles(refFile,treeFile))
 

	
 
		os.remove(treeFile)
 

	
 
	def test_push(self):
 
		ms=Miniserver(filename)
 
		p=multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		for clientFile in ("test2.img","test3.img","test4.img"):
 
			clientFile=os.path.join(dataDir,clientFile)
 
			c=Client(clientFile)
 
			with ClientConnection("127.0.0.1",config.port) as con:
 
				c.setConnection(con)
 
				blocksToTransfer=c.negotiate()
 
				c.sendData(blocksToTransfer)
 

	
 
			self.assertEqual(*compareFiles(clientFile,filename))
 

	
 
		p.terminate()
 
		p.join()
 

	
 
	def test_pull(self):
 
		serverFile=os.path.join(dataDir,"test3.img")
 
		ms=Miniserver(serverFile)
 
		p=multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c=Client(filename)
 
		with ClientConnection("127.0.0.1",config.port) as con:
 
			c.setConnection(con)
 
			blocksToTransfer=c.negotiate()
 
			c.pullData(blocksToTransfer)
 

	
 
		self.assertEqual(*compareFiles(serverFile,filename))
 

	
 
		p.terminate()
 
		p.join()
0 comments (0 inline, 0 general)