Changeset - 6f31f20feb06
[Not reviewed]
default
0 3 1
Laman - 8 years ago 2017-02-02 21:53:26

client organized into functions
4 files changed with 79 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
from hashtree import HashTree
 
import collections
 
from networkers import NetworkReader,NetworkWriter
 
import socket
 
import sys
 
 
 
localTree=HashTree.fromFile(open("clientFile.txt",mode="rb"))
 
 
HOST = '127.0.0.1'    # The remote host
 
PORT = 50009              # The same port as used by the server
 
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
s.connect((HOST, PORT))
 
fr=s.makefile(mode='rb')
 
fw=s.makefile(mode='wb')
 
import config as conf
 
from networkers import NetworkReader,NetworkWriter
 
 
networkReader=NetworkReader(fr)
 
networkReader.start()
 
networkWriter=NetworkWriter(fw)
 
networkWriter.start()
 
 
blocksToTransfer=[]
 
nodeStack=collections.deque([0]) # root
 
incoming=networkReader.output # synchronized message queue
 
outcoming=networkWriter.input
 
 
# initialize session
 
jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount}
 
outcoming.put((jsonData,b""))
 
def connect():
 
	HOST = conf.hosts[0]    # The remote host
 
	PORT = conf.port              # The same port as used by the server
 
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
	s.connect((HOST, PORT))
 
	fr=s.makefile(mode='rb')
 
	fw=s.makefile(mode='wb')
 
 
# determine which blocks to send
 
while len(nodeStack)>0:
 
  i=nodeStack.pop()
 
  jsonData={"command":"req", "index":i}
 
  outcoming.put((jsonData,b""))
 
  
 
  jsonData,binData=incoming.get(timeout=2)
 
  assert jsonData["index"]==i
 
  assert jsonData["dataType"]=="hash"
 
  
 
  if localTree.store[i]!=binData:
 
    if 2*i+3<len(localTree.store): # inner node
 
      nodeStack.append(2*i+2)
 
      nodeStack.append(2*i+1)
 
    else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
	networkReader=NetworkReader(fr)
 
	networkReader.start()
 
	networkWriter=NetworkWriter(fw)
 
	networkWriter.start()
 
 
	incoming=networkReader.output # synchronized message queue
 
	outcoming=networkWriter.input
 
 
	return (s,incoming,outcoming)
 
 
 
# send the actual data
 
print(blocksToTransfer)
 
dataFile=open("clientFile.txt",mode="rb")
 
def negotiate(incoming,outcoming):
 
	localTree=HashTree.fromFile(open("clientFile.txt",mode="rb"))
 
	blocksToTransfer=[]
 
	nodeStack=collections.deque([0]) # root
 
 
	# initialize session
 
	jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
	outcoming.put((jsonData,b""))
 
 
	# determine which blocks to send
 
	while len(nodeStack)>0:
 
		i=nodeStack.pop()
 
		jsonData={"command":"req", "index":i}
 
		outcoming.put((jsonData,b""))
 
 
		jsonData,binData=incoming.get(timeout=2)
 
		assert jsonData["index"]==i
 
		assert jsonData["dataType"]=="hash"
 
 
		if localTree.store[i]!=binData:
 
			if 2*i+3<len(localTree.store): # inner node
 
				nodeStack.append(2*i+2)
 
				nodeStack.append(2*i+1)
 
			else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
 
	return blocksToTransfer
 
 
 
for i in blocksToTransfer:
 
  jsonData={"command":"send", "index":i, "dataType":"data"}
 
  dataFile.seek(i*localTree.BLOCK_SIZE)
 
  binData=dataFile.read(localTree.BLOCK_SIZE)
 
  
 
  print("block #{0}: {1}...{2}".format(i,binData[:5],binData[-5:]))
 
  
 
  outcoming.put((jsonData,binData),timeout=2)
 
def sendData(outcoming,blocksToTransfer):
 
	print(blocksToTransfer)
 
	dataFile=open("clientFile.txt",mode="rb")
 
 
	for i in blocksToTransfer:
 
		jsonData={"command":"send", "index":i, "dataType":"data"}
 
		dataFile.seek(i*HashTree.BLOCK_SIZE)
 
		binData=dataFile.read(HashTree.BLOCK_SIZE)
 
 
		print("block #{0}: {1}...{2}".format(i,binData[:5],binData[-5:]))
 
 
		outcoming.put((jsonData,binData),timeout=2)
 
 
jsonData={"command":"end"}
 
outcoming.put((jsonData,b""),timeout=2)
 
	jsonData={"command":"end"}
 
	outcoming.put((jsonData,b""),timeout=2)
 
 
	outcoming.put(None)
 
	print("closing...")
 
	dataFile.close()
 
 
 
outcoming.put(None)
 
print("closing...")
 
dataFile.close()
 
# fr.close()
 
# fw.close()
 
s.close()
 
sys.exit(0)
 
if __name__=="__main__":
 
	sock,incoming,outcoming = connect()
 
	blocksToTransfer=negotiate(incoming,outcoming)
 
	sendData(outcoming,blocksToTransfer)
 
 
	sock.close()
 
	sys.exit(0)
src/config.py
Show inline comments
 
new file 100644
 
version=0
 

	
 
hosts=["10.0.0.33"]
 
port=50009
src/db.py
Show inline comments
 
import sqlite3
 
import time
 

	
 
COMMIT_TIME=2 # s
 

	
 

	
 
class DB:
 
	def __init__(self,filename):
 
		self.filename=filename
 
		self.connection=sqlite3.connect(self.filename)
 
		self.connection.row_factory=sqlite3.Row
 
		self.commitTime=time.time()
 

	
 
	def setup(self):
 
		# !! err if exists
 
		self.connection.execute("""create table `hashes` (
 
			`sector_id` integer primary key,
 
			`sector_hash` blob not null,
 
			`updated` integer
 
		)""")
 

	
 
		self.connection.commit()
 

	
 
	## Inserts hash into the DB.
 
	#
 
	# @param sectorHash bytes object containing the hash
 
	def insertHash(self,sectorId,sectorHash):
 
		# !! if not connection
 
		res=self.connection.execute("""insert or replace into `hashes` (`sector_id`, `sector_hash`, `updated`)
 
			values (?, ?, strftime('%s','now'))""", (sectorId,sectorHash))
 
		t=time.time()
 
		if t-self.commitTime > COMMIT_TIME:
 
			self.connection.commit()
 
			self.commitTime=t
 
		return res
 

	
 
	def listHashes(self):
 
		# !! if not connection
 
		return self.connection.execute("""select * from `hashes` order by `sector_id` asc""")
src/work.py
Show inline comments
 
import hashlib
 
import sys
 
import datetime
 
import sqlite3
 
import math
 
import pathlib
 
import itertools
 
import logging as log
 
from itertools import zip_longest
 
from db import DB
 

	
 
BLOCK_SIZE=4096
 
HASH_LEN=8
 
DB_NAME="morevna.db"
 
FILE_COUNT=2**21
 
TARGET_DIR="temp"
 

	
 
log.basicConfig(level=log.DEBUG)
 
log.basicConfig(format="{asctime} {levelname}: {message}", style="{", level=log.DEBUG)
 

	
 
class Pipeline:
 
	def __init__(self,fileName):
 
		self.fileName=fileName
 
		self.db=DB(DB_NAME)
 
		self.actions=[]
 

	
 
	def setup(self):
 
		self.db.setup()
 

	
 
	def update(self):
 
		log.info("starting update")
 
		for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())):
 
			assert storedHashRec is None or storedHashRec["sector_id"]==i, "{0}!={1} {2}".format(i, storedHashRec["sector_id"], storedHashRec)
 
			if not chunk: break
 
			hash=self.hash(chunk)
 
			hash=Pipeline.hash(chunk)
 
			storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0"
 
			if hash==storedHash: continue
 
			log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex())
 
			self.copyChunk(chunk,i)
 
			self.db.insertHash(i,hash)
 
		self.db.connection.commit()
 
		log.info("update done")
 

	
 
	def chunks(self):
 
		with open(self.fileName, mode="br") as f: # !! buffering
 
			data=f.read(BLOCK_SIZE)
 
			while data:
 
				yield data
 
				data=f.read(BLOCK_SIZE)
 

	
 
	@staticmethod
 
	def hash(chunk):
 
		return hashlib.sha256(chunk).digest()[:HASH_LEN]
 

	
 
	def copyChunk(self,chunk,i):
 
		path=self.getChunkPath(i)
 
		if not path.parent.exists(): path.parent.mkdir(parents=True) # !! no fail? existing file named as a dir
 
		log.debug("writing file %s", path)
 
		f=path.open(mode="wb")
 
		f.write(chunk)
 
		f.close()
 

	
 
	def mergeChunks(self):
 
		log.info("starting merge")
 
		try: targetFile=open(self.fileName,mode="xb")
 
		except FileExistsError:
 
			log.warning("file %s already exists, aborting",self.fileName)
 
			return False # !! be more specific
 

	
 
		i=0
 
		while True:
 
			try:
 
				sourcePath=self.getChunkPath(i)
 
				sourceFile=sourcePath.open(mode="rb")
 
				targetFile.write(sourceFile.read())
 
				sourceFile.close()
 
				log.info("file %s merged",sourcePath)
 
			except FileNotFoundError: break
 
			i+=1
 
		targetFile.close()
 
		log.info("merge done")
 
		return True
 

	
 
	def getChunkPath(self,i):
 
		paddedName=("{0:0"+str(len(str(FILE_COUNT)))+"}").format(i)
 
		return pathlib.Path(TARGET_DIR).joinpath(Pipeline.getPath(paddedName))
 

	
 
	@staticmethod
 
	def getPath(name):
 
		dirs=[name]
 
		for i in range(2,len(name),2):
 
			dirs.append(name[:-i])
 
		return pathlib.Path(*reversed(dirs))
 

	
 
def initDB():
 
	conn=sqlite3.connect("morevna.db")
 

	
 
	c=conn.cursor()
 
	c.execute("""create table if not exists `hashes` (
 
		`sector_id` integer primary key,
 
		`sector_hash` blob not null,
 
		`updated` integer
 
	)""")
 
	
 
	c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
 
	conn.commit()
 

	
 
def initHashes(fileName):
 
	initDB()
 
			
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.executemany(
 
			"""insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""",
 
			({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName)))
 
		)
 
		db.commit()
 

	
 
def hashes():
 
	with sqlite3.connect(DB) as db:
 
		handle=db.cursor()
 
		handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""")
 
		h=handle.fetchone()
 
		while h is not None:
 
			yield h
 
			h=handle.fetchone()
 

	
 
def findChanges(fileName):
 
	changelist=[]
 

	
 
	# build changelist
0 comments (0 inline, 0 general)