Files @ 1c3d9df86fb1
Branch filter:

Location: Morevna/src/work.py - annotation

Laman
switched from persistent to transient sockets
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
95bfc9d72133
95bfc9d72133
95bfc9d72133
59339cfb3d80
59339cfb3d80
59339cfb3d80
95bfc9d72133
59339cfb3d80
95bfc9d72133
95bfc9d72133
6f31f20feb06
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
6f31f20feb06
95bfc9d72133
6f31f20feb06
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
3ba409cd6541
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
3ba409cd6541
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
95bfc9d72133
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
59339cfb3d80
95bfc9d72133
95bfc9d72133
95bfc9d72133
95bfc9d72133
3ba409cd6541
59339cfb3d80
59339cfb3d80
import hashlib
import sys
import datetime
import sqlite3
import math
import pathlib
import itertools
import logging as log
from itertools import zip_longest
from db import DB

BLOCK_SIZE=4096
HASH_LEN=8
DB_NAME="morevna.db"
FILE_COUNT=2**21
TARGET_DIR="temp"

log.basicConfig(format="{asctime} {levelname}: {message}", style="{", level=log.DEBUG)

class Pipeline:
	def __init__(self,fileName):
		self.fileName=fileName
		self.db=DB(DB_NAME)
		self.actions=[]

	def setup(self):
		self.db.setup()

	def update(self):
		log.info("starting update")
		for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())):
			assert storedHashRec is None or storedHashRec["sector_id"]==i, "{0}!={1} {2}".format(i, storedHashRec["sector_id"], storedHashRec)
			if not chunk: break
			hash=Pipeline.hash(chunk)
			storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0"
			if hash==storedHash: continue
			log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex())
			self.copyChunk(chunk,i)
			self.db.insertHash(i,hash)
		self.db.connection.commit()
		log.info("update done")

	def chunks(self):
		with open(self.fileName, mode="br") as f: # !! buffering
			data=f.read(BLOCK_SIZE)
			while data:
				yield data
				data=f.read(BLOCK_SIZE)

	@staticmethod
	def hash(chunk):
		return hashlib.sha256(chunk).digest()[:HASH_LEN]

	def copyChunk(self,chunk,i):
		path=self.getChunkPath(i)
		if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir
		log.debug("writing file %s", path)
		f=path.open(mode="wb")
		f.write(chunk)
		f.close()

	def mergeChunks(self):
		log.info("starting merge")
		try: targetFile=open(self.fileName,mode="xb")
		except FileExistsError:
			log.warning("file %s already exists, aborting",self.fileName)
			return False # !! be more specific

		i=0
		while True:
			try:
				sourcePath=self.getChunkPath(i)
				sourceFile=sourcePath.open(mode="rb")
				targetFile.write(sourceFile.read())
				sourceFile.close()
				log.info("file %s merged",sourcePath)
			except FileNotFoundError: break
			i+=1
		targetFile.close()
		log.info("merge done")
		return True

	def getChunkPath(self,i):
		paddedName=("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i)
		return pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(paddedName))

	@staticmethod
	def getPath(name):
		dirs=[name]
		for i in range(2,len(name),2):
			dirs.append(name[:-i])
		return pathlib.Path(*reversed(dirs))

def initDB():
	conn=sqlite3.connect("morevna.db")

	c=conn.cursor()
	c.execute("""create table if not exists `hashes` (
		`sector_id` integer primary key,
		`sector_hash` blob not null,
		`updated` integer
	)""")
	
	c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
	conn.commit()

def initHashes(fileName):
	initDB()
			
	with sqlite3.connect(DB) as db:
		handle=db.cursor()
		handle.executemany(
			"""insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""",
			({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName)))
		)
		db.commit()

def hashes():
	with sqlite3.connect(DB) as db:
		handle=db.cursor()
		handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""")
		h=handle.fetchone()
		while h is not None:
			yield h
			h=handle.fetchone()

def findChanges(fileName):
	changelist=[]

	# build changelist
	# can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead
	# that's (8+4)*(8*2**30 / 4096) = 24MB for defaults
	for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())):
		if dataHash!=savedHash:
			changelist.append((i,dataHash))
		if dataHash is None: break # shouldn't happen

	# write log
	with open(logFile, mode="w") as f:
		f.write("sector	hash\n")
		for (i,dataHash) in changelist:
			f.write("{0}\t{1}\n".format(i,dataHash))
			
	# update DB
	with sqlite3.connect(DB) as db:
		handle=db.cursor()
		handle.executemany(
			"""update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""",
			({"id":i,"hash":dataHash} for (i,dataHash) in changelist)
		)
		db.commit()
	
def transferChanges(targetPath):
	# read changes
	with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf:
		handle=db.cursor()
		handle.execute("""select `sector_id` from `hashes` where `dirty`=1""")
			
		# transfer modified sectors and mark them as clean
		sectorIds=handle.fetchall()
		'''for (sectorId,) in sectorIds:
			path=targetPath / getPath(sectorId)
			try: path.parent.mkdir(parents=True)
			except FileExistsError: pass
			df=path.open(mode="wb")
			sf.seek(sectorId)
			df.write(sf.read(BLOCK_SIZE))
			handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,))
			db.commit()'''
		sector=sf.read(BLOCK_SIZE)
		i=j=0
		while sector and j<len(sectorIds):
			if i==sectorIds[j][0]:
				path=targetPath / getPath(sectorId)
				try: path.parent.mkdir(parents=True)
				except FileExistsError: pass
				df=path.open(mode="wb")
				df.write(sector)
				handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorIds[j][0],))
				db.commit()
				j+=1
			i+=1
			sector=sf.read(BLOCK_SIZE)

def getPath(index):
	nodeIds=[]
	k=1
	while k<=FILE_COUNT:
		paddedLen=int(math.log10(FILE_COUNT/k))
		nodeIds.append("{0:0{1}}",index//k,paddedLen)
		k*=FILES_PER_DIR
	nodeIds.reverse()
	return pathlib.Path(*[id for id in enumerate(nodeIds)])

action=sys.argv[1]
fileName=sys.argv[2]
baseName=".".join(fileName.split(".")[:-1])
hashFile="{0}-hash.dat".format(baseName)
isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
logFile="{0}-{1}.log".format(baseName,isoDate)

pipeline=Pipeline(fileName)
if action=="setup": pipeline.setup()
elif action=="update": pipeline.update()
#elif action=="transfer": transferChanges(sys.argv[3])
elif action=="rebuild": pipeline.mergeChunks()
else: print("bad action")