Changeset - 3ba409cd6541
[Not reviewed]
default
0 1 0
Laman - 9 years ago 2016-08-10 21:20:13

rebuild command to get the original file
1 file changed with 27 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/work.py
Show inline comments
 
import hashlib
 
import sys
 
import datetime
 
import sqlite3
 
import math
 
import pathlib
 
import itertools
 
import logging as log
 
from itertools import zip_longest
 
from db import DB
 

	
 
BLOCK_SIZE=4096
 
HASH_LEN=8
 
DB_NAME="morevna.db"
 
FILE_COUNT=2**21
 
TARGET_DIR="temp"
 

	
 
log.basicConfig(level=log.DEBUG)
 

	
 
class Pipeline:
 
	def __init__(self,fileName):
 
		self.fileName=fileName
 
		self.db=DB(DB_NAME)
 
		self.actions=[]
 

	
 
	def setup(self):
 
		self.db.setup()
 

	
 
	def update(self):
 
		log.info("starting update")
 
		for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())):
 
			if not chunk: break
 
			hash=self.hash(chunk)
 
			storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0"
 
			if hash==storedHash: continue
 
			log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex())
 
			self.copyChunk(chunk,i)
 
			self.db.insertHash(i,hash)
 
		self.db.connection.commit()
 
		log.info("update done")
 

	
 
	def chunks(self):
 
		with open(self.fileName, mode="br") as f: # !! buffering
 
			data=f.read(BLOCK_SIZE)
 
			while data:
 
				yield data
 
				data=f.read(BLOCK_SIZE)
 

	
 
	@staticmethod
 
	def hash(chunk):
 
		return hashlib.sha256(chunk).digest()[:HASH_LEN]
 

	
 
	def copyChunk(self,chunk,i):
 
		path=pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i)))
 
		path=self.getChunkPath(i)
 
		if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir
 
		log.debug("writing file %s", path)
 
		f=path.open(mode="wb")
 
		f.write(chunk)
 
		f.close()
 

	
 
	def mergeChunks(self):
 
		log.info("starting merge")
 
		try: targetFile=open(self.fileName,mode="xb")
 
		except FileExistsError:
 
			log.warning("file %s already exists, aborting",self.fileName)
 
			return False # !! be more specific
 

	
 
		i=0
 
		while True:
 
			try:
 
				sourcePath=self.getChunkPath(i)
 
				sourceFile=sourcePath.open(mode="rb")
 
				targetFile.write(sourceFile.read())
 
				sourceFile.close()
 
				log.info("file %s merged",sourcePath)
 
			except FileNotFoundError: break
 
			i+=1
 
		targetFile.close()
 
		log.info("merge done")
 
		return True
 

	
 
	def getChunkPath(self,i):
 
		paddedName=("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i)
 
		return pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(paddedName))
 

	
 
	@staticmethod
 
	def getPath(name):
 
		dirs=[name]
 
		for i in range(2,len(name),2):
 
			dirs.append(name[:-i])
 
		return pathlib.Path(*reversed(dirs))
 

	
 
def initDB():
 
	conn=sqlite3.connect("morevna.db")
 

	
 
	c=conn.cursor()
 
	c.execute("""create table if not exists `hashes` (
 
		`sector_id` integer primary key,
 
		`sector_hash` blob not null,
 
		`updated` integer
 
	)""")
 
	
 
	c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
 
	conn.commit()
 

	
 
def initHashes(fileName):
 
	initDB()
 
			
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.executemany(
 
			"""insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""",
 
			({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName)))
 
		)
 
		db.commit()
 

	
 
def hashes():
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""")
 
		h=handle.fetchone()
 
		while h is not None:
 
			yield h
 
			h=handle.fetchone()
 

	
 
def findChanges(fileName):
 
	changelist=[]
 

	
 
	# build changelist
 
	# can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead
 
	# that's (8+4)*(8*2**30 / 4096) = 24MB for defaults
 
	for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())):
 
		if dataHash!=savedHash:
 
			changelist.append((i,dataHash))
 
		if dataHash is None: break # shouldn't happen
 

	
 
	# write log
 
	with open(logFile, mode="w") as f:
 
		f.write("sector	hash\n")
 
		for (i,dataHash) in changelist:
 
			f.write("{0}\t{1}\n".format(i,dataHash))
 
			
 
	# update DB
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.executemany(
 
			"""update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""",
 
			({"id":i,"hash":dataHash} for (i,dataHash) in changelist)
 
		)
 
		db.commit()
 
	
 
def transferChanges(targetPath):
 
	# read changes
 
	with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf:
 
		handle=db.cursor()
 
		handle.execute("""select `sector_id` from `hashes` where `dirty`=1""")
 
			
 
		# transfer modified sectors and mark them as clean
 
		sectorIds=handle.fetchall()
 
		'''for (sectorId,) in sectorIds:
 
			path=targetPath / getPath(sectorId)
 
			try: path.parent.mkdir(parents=True)
 
			except FileExistsError: pass
 
			df=path.open(mode="wb")
 
			sf.seek(sectorId)
 
			df.write(sf.read(BLOCK_SIZE))
 
			handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,))
 
			db.commit()'''
 
		sector=sf.read(BLOCK_SIZE)
 
		i=j=0
 
		while sector and j<len(sectorIds):
 
			if i==sectorIds[j][0]:
 
				path=targetPath / getPath(sectorId)
 
				try: path.parent.mkdir(parents=True)
 
				except FileExistsError: pass
 
				df=path.open(mode="wb")
 
				df.write(sector)
 
				handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorIds[j][0],))
 
				db.commit()
 
				j+=1
 
			i+=1
 
			sector=sf.read(BLOCK_SIZE)
 

	
 
def getPath(index):
 
	nodeIds=[]
 
	k=1
 
	while k<=FILE_COUNT:
 
		paddedLen=int(math.log10(FILE_COUNT/k))
 
		nodeIds.append("{0:0{1}}",index//k,paddedLen)
 
		k*=FILES_PER_DIR
 
	nodeIds.reverse()
 
	return pathlib.Path(*[id for id in enumerate(nodeIds)])
 

	
 
action=sys.argv[1]
 
fileName=sys.argv[2]
 
baseName=".".join(fileName.split(".")[:-1])
 
hashFile="{0}-hash.dat".format(baseName)
 
isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
 
logFile="{0}-{1}.log".format(baseName,isoDate)
 

	
 
pipeline=Pipeline(fileName)
 
if action=="setup": pipeline.setup()
 
elif action=="update": pipeline.update()
 
#elif action=="transfer": transferChanges(sys.argv[3])
 
elif action=="rebuild": pipeline.mergeChunks()
 
else: print("bad action")
 

	
0 comments (0 inline, 0 general)