Changeset - 75e070b6b447
[Not reviewed]
default
0 2 1
Laman - 7 years ago 2017-10-19 14:00:10

refactored common client and server code into netnode
3 files changed with 64 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from networkers import NetworkReader,NetworkWriter
 
from netnode import BaseConnection,NetNode
 

	
 

	
 
class Connection:
 
class Connection(BaseConnection):
 
	def __init__(self):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 
		self.createNetworkers()
 

	
 

	
 
class Client:
 
class Client(NetNode):
 
	def __init__(self,filename,treeFile=""):
 
		self._incoming=None
 
		self._outcoming=None
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		print(datetime.now(), "initializing...")
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		super().__init__(filename,treeFile)
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._tree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
@@ -148,19 +129,16 @@ class Client:
 

	
 
			stats.logTransferredBlock()
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
		if self._treeFile:
 
			log.info("updating hash tree...")
 
			for (k,v) in self._newLeaves.items():
 
				self._tree.updateLeaf(k, v)
 
			self._tree.save(self._treeFile)
 
			self._updateTree()
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/netnode.py
Show inline comments
 
new file 100644
 
import socket
 
import logging as log
 

	
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
class BaseConnection: # abstract
 
	def __init__(self):
 
		self._socket=None
 
		self.incoming=None
 
		self.outcoming=None
 

	
 
	def createNetworkers(self):
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class NetNode:
 
	def __init__(self,filename,treeFile=""):
 
		self._incoming=None
 
		self._outcoming=None
 

	
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 

	
 
	def _updateTree(self):
 
		log.info("updating hash tree...")
 
		for (k,v) in self._newLeaves.items():
 
			self._tree.updateLeaf(k, v)
 
		self._tree.save(self._treeFile)
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import HashTree,hashBlock
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import hashBlock
 
from netnode import BaseConnection,NetNode
 
import config as conf
 

	
 

	
 
class Connection:
 
class Connection(BaseConnection):
 
	def __init__(self,serverSocket,sslContext):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 
		self.createNetworkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as c:
 
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
 
				p.start()
 
				p.join()
 

	
 

	
 
class Server:
 
class Server(NetNode):
 
	def __init__(self,connection,filename,treeFile=""):
 
		super().__init__(filename,treeFile)
 
		(self._incoming,self._outcoming)=connection
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 

	
 
	@staticmethod
 
	def run(*args):
 
		s=Server(*args)
 
		s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
@@ -139,17 +123,14 @@ class Server:
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)
 

	
 
		return ({"command": "ack", "index": i},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		if self._treeFile:
 
			log.info("updating hash tree...")
 
			for (k,v) in self._newLeaves.items():
 
				self._tree.updateLeaf(k, v)
 
			self._tree.save(self._treeFile)
 
			self._updateTree()
 
		log.info("done")
0 comments (0 inline, 0 general)