Changeset - a52fefe61468
[Not reviewed]
default
0 5 1
Laman - 9 years ago 2016-02-24 18:57:20

work in progress
6 files changed with 127 insertions and 1 deletions:
0 comments (0 inline, 0 general)
.hgignore
Show inline comments
 
modified file chmod 100644 => 100755
client.py
Show inline comments
 
modified file chmod 100644 => 100755
 
@@ -24,6 +24,9 @@ nodeStack=collections.deque([0]) # root
 
incoming=networkReader.output # synchronized message queue
 
outcoming=networkWriter.input
 
 
# initialize session
 
jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount}
 
outcoming.put((jsonData,b""))
 
 
# determine which blocks to send
 
while len(nodeStack)>0:
hashtree.py
Show inline comments
 
modified file chmod 100644 => 100755
 
@@ -12,6 +12,7 @@ class HashTree:
 
    self.store=[None]*(leafCount*2-1)
 
    self.leafStart=leafCount-1
 
    self.index=self.leafStart
 
    self.leafCount=leafCount
 
    
 
  @classmethod
 
  def fromFile(cls,fd):
networkers.py
Show inline comments
 
modified file chmod 100644 => 100755
server.py
Show inline comments
 
modified file chmod 100644 => 100755
 
@@ -36,7 +36,11 @@ outcoming=networkWriter.input
 
while True:
 
  jsonData,binData=incoming.get(timeout=2)
 
  
 
  if jsonData["command"]=="req": # !! index out of range
 
  if jsonData["command"]=="init":
 
    assert jsonData["blockSize"]==localTree.BLOCK_SIZE
 
    assert jsonData["blockCount"]==localTree.leafCount
 
    
 
  elif jsonData["command"]=="req": # !! index out of range
 
    print("received request for node #{0}".format(jsonData["index"]))
 
    nodeHash=localTree.store[jsonData["index"]]
 
    
work.py
Show inline comments
 
new file 100644
 
import hashlib
 
import sys
 
import datetime
 
import sqlite3
 
import math
 
import pathlib
 

	
 
BLOCK_SIZE=4096
 
HASH_LEN=8
 
DB="morevna.db"
 
FILES_PER_DIR=100
 
FILE_COUNT=2**21
 

	
 
def initDB():
 
	conn=sqlite3.connect("morevna.db")
 

	
 
	c=conn.cursor()
 
	c.execute("""create table if not exists `hashes` (
 
		`sector_id` integer primary key,
 
		`sector_hash` blob not null,
 
		`dirty` integer check(`dirty` in (0,1))
 
	)""")
 
	
 
	c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
 
	conn.commit()
 

	
 
def initHashes(fileName, hashFile):
 
	with open(hashFile, mode="bw") as f:
 
		for chunkHash in chunks(fileName):
 
			f.write(chunkHash)
 

	
 
def chunks(fileName):
 
	with open(fileName, mode="br") as f:
 
		data=f.read(BLOCK_SIZE)
 
		while data:			
 
			yield hashlib.sha256(data).digest()[:HASH_LEN]
 
			data=f.read(BLOCK_SIZE)
 

	
 
def hashes(filename):
 
#	with open(filename, mode="br") as f:
 
#		hashBytes=f.read(HASH_LEN)
 
#		while hashBytes:
 
#			yield hashBytes
 
#			hashBytes=f.read(HASH_LEN)
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""")
 
		h=handle.fetchone()
 
		while h is not None:
 
			yield h
 
			h=handle.fetchone()
 

	
 
def compare(fileName, hashFile):
 
	changelist=[]
 

	
 
	# build changelist
 
	# can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead
 
	# that's (8+4)*(8*2**30 / 4096) = 24MB for defaults
 
	for (i,(dataHash,savedHash)) in enumerate(zip(chunks(fileName),hashes(hashFile))):
 
		if dataHash!=savedHash:
 
			changelist.append((i,dataHash))
 

	
 
	# write log
 
	with open(logFile, mode="w") as f:
 
		f.write("sector	hash\n")
 
		for (i,dataHash) in changelist:
 
			f.write("{0}\t{1}\n".format(i,dataHash))
 
			
 
	# update DB
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.executemany("""update `hashes` set `sector_hash`=':hash', `dirty`=1 where `sector_id`=:id""",
 
			{"sector_id":i,"sector_hash":dataHash} for (i,dataHash) in changelist)
 
		db.commit()
 
			
 
	# update hashFile
 
#	with open(hashFile, mode="r+b") as f:
 
#		for (i,dataHash) in changelist:
 
#			f.seek(i*HASH_LEN)
 
#			f.write(dataHash)
 
	
 
def transferChanges():
 
	# read changes
 
	with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf:
 
		handle=db.cursor()
 
		handle.execute("""select `hash_id` from `hashes` where `dirty`=1""")
 
			
 
		# transfer modified sectors and mark them as clean
 
		sectorIds=handle.fetchall()
 
		for sectorId in sectorIds:
 
			path=getPath(sectorId)
 
			path.parent.mkdir(parents=True,exist_ok=True)
 
			df=path.open(mode="wb")
 
			sf.seek(sectorId)
 
			df.write(sf.read(BLOCK_SIZE))
 
			handle.execute("""update `hashes` set `dirty`=0 where `hash_id`=?""",sectorId)
 
			db.commit()
 

	
 
def getPath(index):
 
	nodeIds=[]
 
	k=1
 
	for i in range(math.ceil(math.log(FILE_COUNT)/math.log(k))):
 
		nodeIds.append(index//k)
 
		k*=FILES_PER_DIR
 
	nodeIds.reverse()
 
	return pathlib.Path(*nodeIds)
 

	
 
action=sys.argv[1]
 
fileName=sys.argv[2]
 
baseName=".".join(fileName.split(".")[:-1])
 
hashFile="{0}-hash.dat".format(baseName)
 
isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
 
logFile="{0}-{1}.log".format(baseName,isoDate)
 

	
 
if action=="init": initHashes(fileName, hashFile)
 
elif action=="update": compare(fileName, hashFile)
 
else: print("bad action")
 

	
0 comments (0 inline, 0 general)