import hashlib import sys import datetime import sqlite3 import math import pathlib import itertools import logging as log from itertools import zip_longest from db import DB BLOCK_SIZE=4096 HASH_LEN=8 DB_NAME="morevna.db" FILE_COUNT=2**21 TARGET_DIR="temp" log.basicConfig(level=log.DEBUG) class Pipeline: def __init__(self,fileName): self.fileName=fileName self.db=DB(DB_NAME) self.actions=[] def setup(self): self.db.setup() def update(self): log.info("starting update") for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())): if not chunk: break hash=self.hash(chunk) storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0" if hash==storedHash: continue log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex()) self.copyChunk(chunk,i) self.db.insertHash(i,hash) self.db.connection.commit() log.info("update done") def chunks(self): with open(self.fileName, mode="br") as f: # !! buffering data=f.read(BLOCK_SIZE) while data: yield data data=f.read(BLOCK_SIZE) @staticmethod def hash(chunk): return hashlib.sha256(chunk).digest()[:HASH_LEN] def copyChunk(self,chunk,i): path=self.getChunkPath(i) if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir log.debug("writing file %s", path) f=path.open(mode="wb") f.write(chunk) f.close() def mergeChunks(self): log.info("starting merge") try: targetFile=open(self.fileName,mode="xb") except FileExistsError: log.warning("file %s already exists, aborting",self.fileName) return False # !! be more specific i=0 while True: try: sourcePath=self.getChunkPath(i) sourceFile=sourcePath.open(mode="rb") targetFile.write(sourceFile.read()) sourceFile.close() log.info("file %s merged",sourcePath) except FileNotFoundError: break i+=1 targetFile.close() log.info("merge done") return True def getChunkPath(self,i): paddedName=("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i) return pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(paddedName)) @staticmethod def getPath(name): dirs=[name] for i in range(2,len(name),2): dirs.append(name[:-i]) return pathlib.Path(*reversed(dirs)) def initDB(): conn=sqlite3.connect("morevna.db") c=conn.cursor() c.execute("""create table if not exists `hashes` ( `sector_id` integer primary key, `sector_hash` blob not null, `updated` integer )""") c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""") conn.commit() def initHashes(fileName): initDB() with sqlite3.connect(DB) as db: handle=db.cursor() handle.executemany( """insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""", ({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName))) ) db.commit() def hashes(): with sqlite3.connect(DB) as db: handle=db.cursor() handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""") h=handle.fetchone() while h is not None: yield h h=handle.fetchone() def findChanges(fileName): changelist=[] # build changelist # can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead # that's (8+4)*(8*2**30 / 4096) = 24MB for defaults for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())): if dataHash!=savedHash: changelist.append((i,dataHash)) if dataHash is None: break # shouldn't happen # write log with open(logFile, mode="w") as f: f.write("sector hash\n") for (i,dataHash) in changelist: f.write("{0}\t{1}\n".format(i,dataHash)) # update DB with sqlite3.connect(DB) as db: handle=db.cursor() handle.executemany( """update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""", ({"id":i,"hash":dataHash} for (i,dataHash) in changelist) ) db.commit() def transferChanges(targetPath): # read changes with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf: handle=db.cursor() handle.execute("""select `sector_id` from `hashes` where `dirty`=1""") # transfer modified sectors and mark them as clean sectorIds=handle.fetchall() '''for (sectorId,) in sectorIds: path=targetPath / getPath(sectorId) try: path.parent.mkdir(parents=True) except FileExistsError: pass df=path.open(mode="wb") sf.seek(sectorId) df.write(sf.read(BLOCK_SIZE)) handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,)) db.commit()''' sector=sf.read(BLOCK_SIZE) i=j=0 while sector and j