diff --git a/src/work.py b/src/work.py deleted file mode 100644 --- a/src/work.py +++ /dev/null @@ -1,208 +0,0 @@ -import hashlib -import sys -import datetime -import sqlite3 -import math -import pathlib -import itertools -import logging as log -from itertools import zip_longest -from db import DB - -BLOCK_SIZE=4096 -HASH_LEN=8 -DB_NAME="morevna.db" -FILE_COUNT=2**21 -TARGET_DIR="temp" - -log.basicConfig(format="{asctime} {levelname}: {message}", style="{", level=log.DEBUG) - -class Pipeline: - def __init__(self,fileName): - self.fileName=fileName - self.db=DB(DB_NAME) - self.actions=[] - - def setup(self): - self.db.setup() - - def update(self): - log.info("starting update") - for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())): - assert storedHashRec is None or storedHashRec["sector_id"]==i, "{0}!={1} {2}".format(i, storedHashRec["sector_id"], storedHashRec) - if not chunk: break - hash=Pipeline.hash(chunk) - storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0" - if hash==storedHash: continue - log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex()) - self.copyChunk(chunk,i) - self.db.insertHash(i,hash) - self.db.connection.commit() - log.info("update done") - - def chunks(self): - with open(self.fileName, mode="br") as f: # !! buffering - data=f.read(BLOCK_SIZE) - while data: - yield data - data=f.read(BLOCK_SIZE) - - @staticmethod - def hash(chunk): - return hashlib.sha256(chunk).digest()[:HASH_LEN] - - def copyChunk(self,chunk,i): - path=self.getChunkPath(i) - if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir - log.debug("writing file %s", path) - f=path.open(mode="wb") - f.write(chunk) - f.close() - - def mergeChunks(self): - log.info("starting merge") - try: targetFile=open(self.fileName,mode="xb") - except FileExistsError: - log.warning("file %s already exists, aborting",self.fileName) - return False # !! be more specific - - i=0 - while True: - try: - sourcePath=self.getChunkPath(i) - sourceFile=sourcePath.open(mode="rb") - targetFile.write(sourceFile.read()) - sourceFile.close() - log.info("file %s merged",sourcePath) - except FileNotFoundError: break - i+=1 - targetFile.close() - log.info("merge done") - return True - - def getChunkPath(self,i): - paddedName=("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i) - return pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(paddedName)) - - @staticmethod - def getPath(name): - dirs=[name] - for i in range(2,len(name),2): - dirs.append(name[:-i]) - return pathlib.Path(*reversed(dirs)) - -def initDB(): - conn=sqlite3.connect("morevna.db") - - c=conn.cursor() - c.execute("""create table if not exists `hashes` ( - `sector_id` integer primary key, - `sector_hash` blob not null, - `updated` integer - )""") - - c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""") - conn.commit() - -def initHashes(fileName): - initDB() - - with sqlite3.connect(DB) as db: - handle=db.cursor() - handle.executemany( - """insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""", - ({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName))) - ) - db.commit() - -def hashes(): - with sqlite3.connect(DB) as db: - handle=db.cursor() - handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""") - h=handle.fetchone() - while h is not None: - yield h - h=handle.fetchone() - -def findChanges(fileName): - changelist=[] - - # build changelist - # can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead - # that's (8+4)*(8*2**30 / 4096) = 24MB for defaults - for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())): - if dataHash!=savedHash: - changelist.append((i,dataHash)) - if dataHash is None: break # shouldn't happen - - # write log - with open(logFile, mode="w") as f: - f.write("sector hash\n") - for (i,dataHash) in changelist: - f.write("{0}\t{1}\n".format(i,dataHash)) - - # update DB - with sqlite3.connect(DB) as db: - handle=db.cursor() - handle.executemany( - """update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""", - ({"id":i,"hash":dataHash} for (i,dataHash) in changelist) - ) - db.commit() - -def transferChanges(targetPath): - # read changes - with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf: - handle=db.cursor() - handle.execute("""select `sector_id` from `hashes` where `dirty`=1""") - - # transfer modified sectors and mark them as clean - sectorIds=handle.fetchall() - '''for (sectorId,) in sectorIds: - path=targetPath / getPath(sectorId) - try: path.parent.mkdir(parents=True) - except FileExistsError: pass - df=path.open(mode="wb") - sf.seek(sectorId) - df.write(sf.read(BLOCK_SIZE)) - handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,)) - db.commit()''' - sector=sf.read(BLOCK_SIZE) - i=j=0 - while sector and j