import hashlib import sys import datetime import sqlite3 import math import pathlib import itertools BLOCK_SIZE=4096 HASH_LEN=8 DB="morevna.db" FILES_PER_DIR=100 FILE_COUNT=2**21 def initDB(): conn=sqlite3.connect("morevna.db") c=conn.cursor() c.execute("""create table if not exists `hashes` ( `sector_id` integer primary key, `sector_hash` blob not null, `dirty` integer check(`dirty` in (0,1)) )""") c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""") conn.commit() def initHashes(fileName): initDB() with sqlite3.connect(DB) as db: handle=db.cursor() handle.executemany( """insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""", ({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName))) ) db.commit() def chunks(fileName): with open(fileName, mode="br") as f: data=f.read(BLOCK_SIZE) while data: yield hashlib.sha256(data).digest()[:HASH_LEN] data=f.read(BLOCK_SIZE) def hashes(): with sqlite3.connect(DB) as db: handle=db.cursor() handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""") h=handle.fetchone() while h is not None: yield h h=handle.fetchone() def findChanges(fileName): changelist=[] # build changelist # can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead # that's (8+4)*(8*2**30 / 4096) = 24MB for defaults for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())): if dataHash!=savedHash: changelist.append((i,dataHash)) if dataHash is None: break # shouldn't happen # write log with open(logFile, mode="w") as f: f.write("sector hash\n") for (i,dataHash) in changelist: f.write("{0}\t{1}\n".format(i,dataHash)) # update DB with sqlite3.connect(DB) as db: handle=db.cursor() handle.executemany( """update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""", ({"id":i,"hash":dataHash} for (i,dataHash) in changelist) ) db.commit() def transferChanges(targetPath): # read changes with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf: handle=db.cursor() handle.execute("""select `sector_id` from `hashes` where `dirty`=1""") # transfer modified sectors and mark them as clean sectorIds=handle.fetchall() for (sectorId,) in sectorIds: path=targetPath / getPath(sectorId) try: path.parent.mkdir(parents=True) except FileExistsError: pass df=path.open(mode="wb") sf.seek(sectorId) df.write(sf.read(BLOCK_SIZE)) handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,)) db.commit() def getPath(index): nodeIds=[] k=1 while k<=FILE_COUNT: nodeIds.append(index//k) k*=FILES_PER_DIR nodeIds.reverse() return pathlib.Path(*[str(id) for id in nodeIds]) action=sys.argv[1] fileName=sys.argv[2] baseName=".".join(fileName.split(".")[:-1]) hashFile="{0}-hash.dat".format(baseName) isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S") logFile="{0}-{1}.log".format(baseName,isoDate) if action=="init": initHashes(fileName) elif action=="update": findChanges(fileName) elif action=="transfer": transferChanges(sys.argv[3]) else: print("bad action")