Changeset - 9f2b0a4f3538
[Not reviewed]
default
0 6 0
Laman - 7 years ago 2017-10-31 19:45:36

push to multiple servers
6 files changed with 18 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import sys
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from netnode import BaseConnection,NetNode
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self):
 
	def __init__(self,host,port):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 
		try:
 
			self._socket.connect((conf.hosts[0], conf.port))
 
			self._socket.connect((host,port))
 
		except ConnectionRefusedError:
 
			print("Couldn't connect to {0}:{1}".format(conf.hosts[0],conf.port))
 
			sys.exit(1)
 

	
 
		self.createNetworkers()
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self,filename,treeFile=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename,treeFile)
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._tree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack"
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			indices=[]
 
			for i in range(conf.batchSize):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode(len(indices))
 

	
 
			frontier=[]
 
			for (j,i) in enumerate(indices):
 
				(j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2<len(localTree.store): # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size=stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon
 
from util import spawnDaemon, splitHost
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 
	if os.path.isfile(args.treefile):
 
		treeMod=os.stat(args.treefile).st_mtime
 
		dataMod=os.stat(args.datafile).st_mtime
 
		if dataMod<treeMod and not args.force:
 
			print("tree file is up to date")
 
			return
 

	
 
	tree=HashTree.fromFile(args.datafile)
 
	tree.save(args.treefile)
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	with ClientConnection() as con:
 
	for host in conf.hosts:
 
		with ClientConnection(*splitHost(host,conf.port)) as con:
 
		c.setConnection(con)
 
		blocksToTransfer=c.negotiate()
 
		c.sendData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	with ClientConnection() as con:
 
	with ClientConnection(*splitHost(conf.hosts[0],conf.port)) as con:
 
		c.setConnection(con)
 
		blocksToTransfer=c.negotiate()
 
		c.pullData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Miniserver(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s",e)
 

	
 

	
 
parser=ArgumentParser()
 
subparsers=parser.add_subparsers()
 

	
 
pBuild=subparsers.add_parser("build")
 
pBuild.add_argument("-f","--force",action="store_true",help="force tree rebuild")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 

	
 
pUpdate=subparsers.add_parser("push")
 
pUpdate.add_argument("-p","--port",type=int)
 
pUpdate.add_argument("--host")
 
pUpdate.add_argument("-t","--tree",help="stored hash tree location")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=push)
 

	
 
pUpdate=subparsers.add_parser("pull")
 
pUpdate.add_argument("-p","--port",type=int)
 
pUpdate.add_argument("--host",default="127.0.0.1")
 
pUpdate.add_argument("-t","--tree",help="stored hash tree location")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=pull)
 

	
 
pServe=subparsers.add_parser("serve")
 
pServe.add_argument("-p","--port",type=int)
 
pServe.add_argument("--host")
 
pServe.add_argument("-t","--tree",help="stored hash tree location")
 
pServe.add_argument("datafile")
 
pServe.set_defaults(func=serve)
 

	
 
args=parser.parse_args()
 
args.func(args)
src/netnode.py
Show inline comments
 
import socket
 
import logging as log
 

	
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
class BaseConnection: # abstract
 
	def __init__(self):
 
		self._socket=None
 
		self.incoming=None
 
		self.outcoming=None
 

	
 
	def createNetworkers(self):
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		try:
 
			self._socket.shutdown(socket.SHUT_RDWR)
 
			self._socket.close()
 
		except OSError:
 
			log.warning("encountered an error when shutting down the connection")
 
			log.warning("broken connection")
 

	
 

	
 
class NetNode:
 
	def __init__(self,filename,treeFile=""):
 
		self._incoming=None
 
		self._outcoming=None
 

	
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 

	
 
	def _updateTree(self):
 
		log.info("updating hash tree...")
 
		for (k,v) in self._newLeaves.items():
 
			self._tree.updateLeaf(k, v)
 
		self._tree.save(self._treeFile)
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from netnode import BaseConnection,NetNode
 
import config as conf
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,serverSocket,sslContext):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as c:
 
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
 
				p.start()
 
				p.join()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self,connection,filename,treeFile=""):
 
		super().__init__(filename,treeFile)
 
		(self._incoming,self._outcoming)=connection
 

	
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 

	
 
	@staticmethod
 
	def run(*args):
 
		s=Server(*args)
 
		s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
		except AssertionError as e:
 
		except (AssertionError,ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			self._locked=False
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		i=jsonData["index"]
 
		if self._lastIndex+1!=i:
 
			self._dataFile.seek(i*self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)
 

	
 
		return ({"command": "ack", "index": i},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		if self._treeFile:
 
			self._updateTree()
 
		log.info("done")
src/tests/test_overall.py
Show inline comments
 
import os
 
import sys
 
import shutil
 
import hashlib
 
import multiprocessing
 
from logging import FileHandler
 
from unittest import TestCase
 

	
 
import config
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection
 
from server import Miniserver
 

	
 

	
 
config.logger.removeHandler(config.handler)
 
handler=FileHandler("/tmp/morevna.log")
 
handler.setFormatter(config.formatter)
 
config.logger.addHandler(handler)
 

	
 
config.batchSize=8
 

	
 
dataDir=os.path.join(config.directory,"src/tests/data")
 
filename=os.path.join(dataDir,"test.img")
 

	
 

	
 
def compareFiles(f1,f2):
 
	with open(f1,mode="rb") as f:
 
		h2=hashlib.sha256(f.read()).hexdigest()
 
	with open(f2,mode="rb") as f:
 
		h=hashlib.sha256(f.read()).hexdigest()
 
	return (h,h2)
 

	
 

	
 
class TestMorevna(TestCase):
 
	_stdout=None
 

	
 
	@classmethod
 
	def setUpClass(cls):
 
		cls._stdout=sys.stdout
 
		sys.stdout=open("/tmp/morevna-stdout.log",mode="a")
 

	
 
	def setUp(self):
 
		src=os.path.join(dataDir,"test1.img")
 
		shutil.copyfile(src,filename)
 

	
 
	@classmethod
 
	def tearDownClass(cls):
 
		os.remove(filename)
 
		sys.stdout.close()
 
		sys.stdout=cls._stdout
 

	
 
	def test_build(self):
 
		treeFile=os.path.join(dataDir,"test.bin")
 
		refFile=os.path.join(dataDir,"test1.bin")
 

	
 
		tree=HashTree.fromFile(os.path.join(dataDir,"test1.img"))
 
		tree.save(treeFile)
 

	
 
		self.assertEqual(*compareFiles(refFile,treeFile))
 

	
 
		os.remove(treeFile)
 

	
 
	def test_push(self):
 
		ms=Miniserver(filename)
 
		p=multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		for clientFile in ("test2.img","test3.img","test4.img"):
 
			clientFile=os.path.join(dataDir,clientFile)
 
			c=Client(clientFile)
 
			with ClientConnection() as con:
 
			with ClientConnection("127.0.0.1",config.port) as con:
 
				c.setConnection(con)
 
				blocksToTransfer=c.negotiate()
 
				c.sendData(blocksToTransfer)
 

	
 
			self.assertEqual(*compareFiles(clientFile,filename))
 

	
 
		p.terminate()
 
		p.join()
 

	
 
	def test_pull(self):
 
		serverFile=os.path.join(dataDir,"test3.img")
 
		ms=Miniserver(serverFile)
 
		p=multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c=Client(filename)
 
		with ClientConnection() as con:
 
		with ClientConnection("127.0.0.1",config.port) as con:
 
			c.setConnection(con)
 
			blocksToTransfer=c.negotiate()
 
			c.pullData(blocksToTransfer)
 

	
 
		self.assertEqual(*compareFiles(serverFile,filename))
 

	
 
		p.terminate()
 
		p.join()
src/util.py
Show inline comments
 
import os
 
import sys
 

	
 

	
 
def spawnDaemon(fun):
 
	# do the UNIX double-fork magic, see Stevens' "Advanced
 
	# Programming in the UNIX Environment" for details (ISBN 0201563177)
 
	try:
 
		pid = os.fork()
 
		if pid > 0:
 
			# parent process, return and keep running
 
			return
 
	except OSError as e:
 
		print("fork #1 failed: {0} ({1})".format(e.errno,e.strerror),file=sys.stderr)
 
		sys.exit(1)
 

	
 
	os.setsid()
 

	
 
	# do second fork
 
	try:
 
		pid = os.fork()
 
		if pid > 0:
 
			# exit from second parent
 
			print("[{0}] server running".format(pid))
 
			sys.exit(0)
 
	except OSError as e:
 
		print("fork #2 failed: {0} ({1})".format(e.errno,e.strerror),file=sys.stderr)
 
		sys.exit(1)
 

	
 
	fun()
 

	
 
	# all done
 
	os._exit(os.EX_OK)
 

	
 

	
 
def splitHost(host,defaultPort=0):
 
	address,_,port=host.partition(":")
 
	if not port: port=defaultPort
 
	return (address,port)
 

	
 

	
 
class Progress:
 
	def __init__(self,n,i0=0):
 
		self._n=n
 
		self._i0=i0
 
		self._i=i0
 
		self._last=""
 

	
 
	def p(self,i):
 
		i0=self._i0
 
		n=self._n
 

	
 
		assert i0<=i<n or n<i<=i0, (i0,i,n)
 
		percentage=Progress._p(i,n,i0)
 
		res="{0}%".format(percentage)
 
		if res!=self._last:
 
			print(res,end="\r")
 
			self._last=res
 

	
 
	def done(self):
 
		print("100%")
 

	
 
	@staticmethod
 
	def _p(i,n,i0):
 
		_1=1 if n>=i0 else -1
 
		return 100*(i+_1-i0)//(n-i0)
0 comments (0 inline, 0 general)