Changeset - 95bfc9d72133
[Not reviewed]
default
0 2 1
Laman - 9 years ago 2016-08-07 22:41:27

transfering sectors to separate files
3 files changed with 99 insertions and 13 deletions:
0 comments (0 inline, 0 general)
.hgignore
Show inline comments
 
^src/__pycache__/
 
^\..*
 
^temp/.*
src/db.py
Show inline comments
 
new file 100644
 
import sqlite3
 
import time
 

	
 
COMMIT_TIME=2 # s
 

	
 
class DB:
 
	def __init__(self,filename):
 
		self.filename=filename
 
		self.connection=sqlite3.connect(self.filename)
 
		self.connection.row_factory=sqlite3.Row
 
		self.commitTime=time.time()
 

	
 
	def setup(self):
 
		# !! err if exists
 
		self.connection.execute("""create table `hashes` (
 
			`sector_id` integer primary key,
 
			`sector_hash` blob not null,
 
			`updated` integer
 
		)""")
 

	
 
		self.connection.commit()
 

	
 
	## Inserts hash into the DB.
 
	#
 
	# @param sectorHash bytes object containing the hash
 
	def insertHash(self,sectorId,sectorHash):
 
		# !! if not connection
 
		res=self.connection.execute("""insert or replace into `hashes` (`sector_id`, `sector_hash`, `updated`)
 
			values (?, ?, strftime('%s','now'))""", (sectorId,sectorHash))
 
		t=time.time()
 
		if t-self.commitTime > COMMIT_TIME:
 
			self.connection.commit()
 
			self.commitTime=t
 
		return res
 

	
 
	def listHashes(self):
 
		# !! if not connection
 
		return self.connection.execute("""select * from `hashes` order by `sector_id` asc""")
src/work.py
Show inline comments
 
@@ -5,12 +5,65 @@ import sqlite3
 
import math
 
import pathlib
 
import itertools
 
import logging as log
 
from itertools import zip_longest
 
from db import DB
 

	
 
BLOCK_SIZE=4096
 
HASH_LEN=8
 
DB="morevna.db"
 
FILES_PER_DIR=100
 
DB_NAME="morevna.db"
 
FILE_COUNT=2**21
 
TARGET_DIR="temp"
 

	
 
log.basicConfig(level=log.DEBUG)
 

	
 
class Pipeline:
 
	def __init__(self,fileName):
 
		self.fileName=fileName
 
		self.db=DB(DB_NAME)
 
		self.actions=[]
 

	
 
	def setup(self):
 
		self.db.setup()
 

	
 
	def update(self):
 
		log.info("starting update")
 
		for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())):
 
			if not chunk: break
 
			hash=self.hash(chunk)
 
			storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0"
 
			if hash==storedHash: continue
 
			log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex())
 
			self.copyChunk(chunk,i)
 
			self.db.insertHash(i,hash)
 
		self.db.connection.commit()
 
		log.info("update done")
 

	
 
	def chunks(self):
 
		with open(self.fileName, mode="br") as f: # !! buffering
 
			data=f.read(BLOCK_SIZE)
 
			while data:
 
				yield data
 
				data=f.read(BLOCK_SIZE)
 

	
 
	@staticmethod
 
	def hash(chunk):
 
		return hashlib.sha256(chunk).digest()[:HASH_LEN]
 

	
 
	def copyChunk(self,chunk,i):
 
		path=pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i)))
 
		if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir
 
		log.debug("writing file %s", path)
 
		f=path.open(mode="wb")
 
		f.write(chunk)
 
		f.close()
 

	
 
	@staticmethod
 
	def getPath(name):
 
		dirs=[name]
 
		for i in range(2,len(name),2):
 
			dirs.append(name[:-i])
 
		return pathlib.Path(*reversed(dirs))
 

	
 
def initDB():
 
	conn=sqlite3.connect("morevna.db")
 
@@ -19,7 +72,7 @@ def initDB():
 
	c.execute("""create table if not exists `hashes` (
 
		`sector_id` integer primary key,
 
		`sector_hash` blob not null,
 
		`dirty` integer check(`dirty` in (0,1))
 
		`updated` integer
 
	)""")
 
	
 
	c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
 
@@ -36,13 +89,6 @@ def initHashes(fileName):
 
		)
 
		db.commit()
 

	
 
def chunks(fileName):
 
	with open(fileName, mode="br") as f:
 
		data=f.read(BLOCK_SIZE)
 
		while data:			
 
			yield hashlib.sha256(data).digest()[:HASH_LEN]
 
			data=f.read(BLOCK_SIZE)
 

	
 
def hashes():
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
@@ -127,8 +173,9 @@ hashFile="{0}-hash.dat".format(baseName)
 
isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
 
logFile="{0}-{1}.log".format(baseName,isoDate)
 

	
 
if action=="init": initHashes(fileName)
 
elif action=="update": findChanges(fileName)
 
elif action=="transfer": transferChanges(sys.argv[3])
 
pipeline=Pipeline(fileName)
 
if action=="setup": pipeline.setup()
 
elif action=="update": pipeline.update()
 
#elif action=="transfer": transferChanges(sys.argv[3])
 
else: print("bad action")
 

	
0 comments (0 inline, 0 general)