diff --git a/.hgignore b/.hgignore --- a/.hgignore +++ b/.hgignore @@ -1,2 +1,3 @@ ^src/__pycache__/ ^\..* +^temp/.* diff --git a/src/db.py b/src/db.py new file mode 100644 --- /dev/null +++ b/src/db.py @@ -0,0 +1,38 @@ +import sqlite3 +import time + +COMMIT_TIME=2 # s + +class DB: + def __init__(self,filename): + self.filename=filename + self.connection=sqlite3.connect(self.filename) + self.connection.row_factory=sqlite3.Row + self.commitTime=time.time() + + def setup(self): + # !! err if exists + self.connection.execute("""create table `hashes` ( + `sector_id` integer primary key, + `sector_hash` blob not null, + `updated` integer + )""") + + self.connection.commit() + + ## Inserts hash into the DB. + # + # @param sectorHash bytes object containing the hash + def insertHash(self,sectorId,sectorHash): + # !! if not connection + res=self.connection.execute("""insert or replace into `hashes` (`sector_id`, `sector_hash`, `updated`) + values (?, ?, strftime('%s','now'))""", (sectorId,sectorHash)) + t=time.time() + if t-self.commitTime > COMMIT_TIME: + self.connection.commit() + self.commitTime=t + return res + + def listHashes(self): + # !! if not connection + return self.connection.execute("""select * from `hashes` order by `sector_id` asc""") diff --git a/src/work.py b/src/work.py --- a/src/work.py +++ b/src/work.py @@ -5,12 +5,65 @@ import sqlite3 import math import pathlib import itertools +import logging as log +from itertools import zip_longest +from db import DB BLOCK_SIZE=4096 HASH_LEN=8 -DB="morevna.db" -FILES_PER_DIR=100 +DB_NAME="morevna.db" FILE_COUNT=2**21 +TARGET_DIR="temp" + +log.basicConfig(level=log.DEBUG) + +class Pipeline: + def __init__(self,fileName): + self.fileName=fileName + self.db=DB(DB_NAME) + self.actions=[] + + def setup(self): + self.db.setup() + + def update(self): + log.info("starting update") + for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())): + if not chunk: break + hash=self.hash(chunk) + storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0" + if hash==storedHash: continue + log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex()) + self.copyChunk(chunk,i) + self.db.insertHash(i,hash) + self.db.connection.commit() + log.info("update done") + + def chunks(self): + with open(self.fileName, mode="br") as f: # !! buffering + data=f.read(BLOCK_SIZE) + while data: + yield data + data=f.read(BLOCK_SIZE) + + @staticmethod + def hash(chunk): + return hashlib.sha256(chunk).digest()[:HASH_LEN] + + def copyChunk(self,chunk,i): + path=pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i))) + if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir + log.debug("writing file %s", path) + f=path.open(mode="wb") + f.write(chunk) + f.close() + + @staticmethod + def getPath(name): + dirs=[name] + for i in range(2,len(name),2): + dirs.append(name[:-i]) + return pathlib.Path(*reversed(dirs)) def initDB(): conn=sqlite3.connect("morevna.db") @@ -19,7 +72,7 @@ def initDB(): c.execute("""create table if not exists `hashes` ( `sector_id` integer primary key, `sector_hash` blob not null, - `dirty` integer check(`dirty` in (0,1)) + `updated` integer )""") c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""") @@ -36,13 +89,6 @@ def initHashes(fileName): ) db.commit() -def chunks(fileName): - with open(fileName, mode="br") as f: - data=f.read(BLOCK_SIZE) - while data: - yield hashlib.sha256(data).digest()[:HASH_LEN] - data=f.read(BLOCK_SIZE) - def hashes(): with sqlite3.connect(DB) as db: handle=db.cursor() @@ -127,8 +173,9 @@ hashFile="{0}-hash.dat".format(baseName) isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S") logFile="{0}-{1}.log".format(baseName,isoDate) -if action=="init": initHashes(fileName) -elif action=="update": findChanges(fileName) -elif action=="transfer": transferChanges(sys.argv[3]) +pipeline=Pipeline(fileName) +if action=="setup": pipeline.setup() +elif action=="update": pipeline.update() +#elif action=="transfer": transferChanges(sys.argv[3]) else: print("bad action")