diff --git a/.hgignore b/.hgignore old mode 100644 new mode 100755 diff --git a/client.py b/client.py old mode 100644 new mode 100755 --- a/client.py +++ b/client.py @@ -24,6 +24,9 @@ nodeStack=collections.deque([0]) # root incoming=networkReader.output # synchronized message queue outcoming=networkWriter.input +# initialize session +jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount} +outcoming.put((jsonData,b"")) # determine which blocks to send while len(nodeStack)>0: diff --git a/hashtree.py b/hashtree.py old mode 100644 new mode 100755 --- a/hashtree.py +++ b/hashtree.py @@ -12,6 +12,7 @@ class HashTree: self.store=[None]*(leafCount*2-1) self.leafStart=leafCount-1 self.index=self.leafStart + self.leafCount=leafCount @classmethod def fromFile(cls,fd): diff --git a/networkers.py b/networkers.py old mode 100644 new mode 100755 diff --git a/server.py b/server.py old mode 100644 new mode 100755 --- a/server.py +++ b/server.py @@ -36,7 +36,11 @@ outcoming=networkWriter.input while True: jsonData,binData=incoming.get(timeout=2) - if jsonData["command"]=="req": # !! index out of range + if jsonData["command"]=="init": + assert jsonData["blockSize"]==localTree.BLOCK_SIZE + assert jsonData["blockCount"]==localTree.leafCount + + elif jsonData["command"]=="req": # !! index out of range print("received request for node #{0}".format(jsonData["index"])) nodeHash=localTree.store[jsonData["index"]] diff --git a/work.py b/work.py new file mode 100644 --- /dev/null +++ b/work.py @@ -0,0 +1,118 @@ +import hashlib +import sys +import datetime +import sqlite3 +import math +import pathlib + +BLOCK_SIZE=4096 +HASH_LEN=8 +DB="morevna.db" +FILES_PER_DIR=100 +FILE_COUNT=2**21 + +def initDB(): + conn=sqlite3.connect("morevna.db") + + c=conn.cursor() + c.execute("""create table if not exists `hashes` ( + `sector_id` integer primary key, + `sector_hash` blob not null, + `dirty` integer check(`dirty` in (0,1)) + )""") + + c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""") + conn.commit() + +def initHashes(fileName, hashFile): + with open(hashFile, mode="bw") as f: + for chunkHash in chunks(fileName): + f.write(chunkHash) + +def chunks(fileName): + with open(fileName, mode="br") as f: + data=f.read(BLOCK_SIZE) + while data: + yield hashlib.sha256(data).digest()[:HASH_LEN] + data=f.read(BLOCK_SIZE) + +def hashes(filename): +# with open(filename, mode="br") as f: +# hashBytes=f.read(HASH_LEN) +# while hashBytes: +# yield hashBytes +# hashBytes=f.read(HASH_LEN) + with sqlite3.connect(DB) as db: + handle=db.cursor() + handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""") + h=handle.fetchone() + while h is not None: + yield h + h=handle.fetchone() + +def compare(fileName, hashFile): + changelist=[] + + # build changelist + # can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead + # that's (8+4)*(8*2**30 / 4096) = 24MB for defaults + for (i,(dataHash,savedHash)) in enumerate(zip(chunks(fileName),hashes(hashFile))): + if dataHash!=savedHash: + changelist.append((i,dataHash)) + + # write log + with open(logFile, mode="w") as f: + f.write("sector hash\n") + for (i,dataHash) in changelist: + f.write("{0}\t{1}\n".format(i,dataHash)) + + # update DB + with sqlite3.connect(DB) as db: + handle=db.cursor() + handle.executemany("""update `hashes` set `sector_hash`=':hash', `dirty`=1 where `sector_id`=:id""", + {"sector_id":i,"sector_hash":dataHash} for (i,dataHash) in changelist) + db.commit() + + # update hashFile +# with open(hashFile, mode="r+b") as f: +# for (i,dataHash) in changelist: +# f.seek(i*HASH_LEN) +# f.write(dataHash) + +def transferChanges(): + # read changes + with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf: + handle=db.cursor() + handle.execute("""select `hash_id` from `hashes` where `dirty`=1""") + + # transfer modified sectors and mark them as clean + sectorIds=handle.fetchall() + for sectorId in sectorIds: + path=getPath(sectorId) + path.parent.mkdir(parents=True,exist_ok=True) + df=path.open(mode="wb") + sf.seek(sectorId) + df.write(sf.read(BLOCK_SIZE)) + handle.execute("""update `hashes` set `dirty`=0 where `hash_id`=?""",sectorId) + db.commit() + +def getPath(index): + nodeIds=[] + k=1 + for i in range(math.ceil(math.log(FILE_COUNT)/math.log(k))): + nodeIds.append(index//k) + k*=FILES_PER_DIR + nodeIds.reverse() + return pathlib.Path(*nodeIds) + +action=sys.argv[1] +fileName=sys.argv[2] +baseName=".".join(fileName.split(".")[:-1]) +hashFile="{0}-hash.dat".format(baseName) +isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S") +logFile="{0}-{1}.log".format(baseName,isoDate) + +if action=="init": initHashes(fileName, hashFile) +elif action=="update": compare(fileName, hashFile) +else: print("bad action") +