Changeset - 6c8e994fd906
[Not reviewed]
default
0 3 0
Laman - 7 years ago 2017-10-18 16:22:02

using just one socket, server saving memory when not serving
3 files changed with 68 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -9,15 +9,19 @@ import stats
 
from util import Progress
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 

	
 

	
 
class Connection:
 
	def __init__(self,sslContext):
 
	def __init__(self):
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
@@ -30,39 +34,38 @@ class Connection:
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self._incoming=None
 
		self._outcoming=None
 
		self._filename=filename
 

	
 
		self._ssl=ssl.create_default_context(cafile=conf.peers)
 
		self._ssl.check_hostname=False
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 
		print(datetime.now(), "initializing...")
 
		self._localTree=HashTree.fromFile(self._filename)
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self._filename)
 
		localTree=self._localTree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 

	
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
			progress=Progress(localTree.leafCount)
 
			while len(nodeStack)>0:
 
				i=nodeStack.pop()
 
				outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 
			self._outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 

	
 
				jsonData,binData=incoming.readMsg()
 
			jsonData,binData=self._incoming.readMsg()
 
				assert jsonData["index"]==i
 
				assert jsonData["dataType"]=="hash"
 
				stats.logExchangedNode()
 

	
 
				if localTree.store[i]!=binData:
 
					if 2*i+3<len(localTree.store): # inner node
 
@@ -78,47 +81,44 @@ class Client:
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				outcoming.writeMsg(jsonData,binData)
 
			self._outcoming.writeMsg(jsonData,binData)
 
				stats.logTransferredBlock()
 
				jsonData,binData=incoming.readMsg()
 
			jsonData,binData=self._incoming.readMsg()
 
				assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				i1=i2
 
				progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
				jsonData,binData=incoming.readMsg()
 
			self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
				assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				dataFile.write(binData)
 

	
 
@@ -126,11 +126,13 @@ class Client:
 

	
 
				stats.logTransferredBlock()
 
				i1=i2
 
				progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/morevna.py
Show inline comments
 
@@ -4,14 +4,14 @@ import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client
 
from server import Server
 
from client import Client, Connection as ClientConnection
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 
@@ -26,36 +26,40 @@ def buildTree(args):
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	with ClientConnection() as con:
 
		c.setConnection(con)
 
	blocksToTransfer=c.negotiate()
 
	c.sendData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	with ClientConnection() as con:
 
		c.setConnection(con)
 
	blocksToTransfer=c.negotiate()
 
	c.pullData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Server(args.datafile,args.tree)
 
	s=Miniserver(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s",e)
 

	
 

	
src/server.py
Show inline comments
 
import hashlib
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 
import config as conf
 

	
 
@@ -25,69 +26,83 @@ class Connection:
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class Server:
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 
		self._locked=False
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as c:
 
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
 
				p.start()
 
				p.join()
 

	
 

	
 
class Server:
 
	def __init__(self,connection,filename,treeFile=""):
 
		(self._incoming,self._outcoming)=connection
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 

	
 
	@staticmethod
 
	def run(*args):
 
		s=Server(*args)
 
		s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as (incoming, outcoming):
 
				try:
 
					while self._serveOne(incoming,outcoming):
 
						pass
 
				except AssertionError:
 
					continue
 
			while self._serveOne(): pass
 
		except AssertionError as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._locked=True
 
			outcoming.writeMsg({"command": "ack"})
 
			self._outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			self._locked=False
 
			return False
 

	
0 comments (0 inline, 0 general)