Changeset - 3d0876534e40
[Not reviewed]
default
0 2 0
Laman - 7 years ago 2017-10-18 12:08:11

client can pull changes
2 files changed with 47 insertions and 0 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 

	
 

	
 
class Connection:
 
	def __init__(self,sslContext):
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self._filename=filename
 

	
 
		self._ssl=ssl.create_default_context(cafile=conf.peers)
 
		self._ssl.check_hostname=False
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self._filename)
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 

	
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
			progress=Progress(localTree.leafCount)
 
			while len(nodeStack)>0:
 
				i=nodeStack.pop()
 
				outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 

	
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["index"]==i
 
				assert jsonData["dataType"]=="hash"
 
				stats.logExchangedNode()
 

	
 
				if localTree.store[i]!=binData:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
		progress.done()
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				outcoming.writeMsg(jsonData,binData)
 
				stats.logTransferredBlock()
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				i1=i2
 
				progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				dataFile.write(binData)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				stats.logTransferredBlock()
 
				i1=i2
 
				progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client
 
from server import Server
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 

	
 
	tree=HashTree.fromFile(args.datafile)
 
	tree.save(args.treefile)
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	blocksToTransfer=c.negotiate()
 
	c.sendData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	blocksToTransfer=c.negotiate()
 
	c.pullData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Server(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s",e)
 

	
 

	
 
parser=ArgumentParser()
 
subparsers=parser.add_subparsers()
 

	
 
pBuild=subparsers.add_parser("build")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 

	
 
pUpdate=subparsers.add_parser("push")
 
pUpdate.add_argument("-p","--port",type=int)
 
pUpdate.add_argument("--host",default="127.0.0.1")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=push)
 

	
 
pUpdate=subparsers.add_parser("pull")
 
pUpdate.add_argument("-p","--port",type=int)
 
pUpdate.add_argument("--host",default="127.0.0.1")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=pull)
 

	
 
pServe=subparsers.add_parser("serve")
 
pServe.add_argument("-p","--port",type=int)
 
pServe.add_argument("--host",default="")
 
pServe.add_argument("-t","--tree",help="stored hash tree location")
 
pServe.add_argument("datafile")
 
pServe.set_defaults(func=serve)
 

	
 
args=parser.parse_args()
 
args.func(args)
0 comments (0 inline, 0 general)