Changeset - 6c8e994fd906
[Not reviewed]
default
0 3 0
Laman - 7 years ago 2017-10-18 16:22:02

using just one socket, server saving memory when not serving
3 files changed with 111 insertions and 90 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -12,9 +12,13 @@ from networkers import NetworkReader,Net
 

	
 

	
 
class Connection:
 
	def __init__(self,sslContext):
 
	def __init__(self):
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 
		self._socket.connect((conf.hosts[0], conf.port))
 
		fr=self._socket.makefile(mode="rb")
 
@@ -33,44 +37,43 @@ class Connection:
 

	
 
class Client:
 
	def __init__(self,filename):
 
		self._incoming=None
 
		self._outcoming=None
 
		self._filename=filename
 

	
 
		self._ssl=ssl.create_default_context(cafile=conf.peers)
 
		self._ssl.check_hostname=False
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 
		print(datetime.now(), "initializing...")
 
		self._localTree=HashTree.fromFile(self._filename)
 

	
 
	def negotiate(self):
 
		print(datetime.now(), "initializing...")
 
		localTree=HashTree.fromFile(self._filename)
 
		localTree=self._localTree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
			outcoming.writeMsg(jsonData)
 
			jsonData,binData=incoming.readMsg()
 
			assert jsonData["command"]=="ack"
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack"
 

	
 
			# determine which blocks to send
 
			print(datetime.now(), "negotiating:")
 
			progress=Progress(localTree.leafCount)
 
			while len(nodeStack)>0:
 
				i=nodeStack.pop()
 
				outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			i=nodeStack.pop()
 
			self._outcoming.writeMsg({"command":"req", "index":i, "dataType":"hash"})
 

	
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["index"]==i
 
				assert jsonData["dataType"]=="hash"
 
				stats.logExchangedNode()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==i
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode()
 

	
 
				if localTree.store[i]!=binData:
 
					if 2*i+3<len(localTree.store): # inner node
 
						nodeStack.append(2*i+2)
 
						nodeStack.append(2*i+1)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			if localTree.store[i]!=binData:
 
				if 2*i+3<len(localTree.store): # inner node
 
					nodeStack.append(2*i+2)
 
					nodeStack.append(2*i+1)
 
				else:
 
					blocksToTransfer.append(i-localTree.leafStart) # leaf
 
					progress.p(i-localTree.leafStart)
 
		progress.done()
 

	
 
		return blocksToTransfer
 
@@ -81,26 +84,24 @@ class Client:
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				jsonData={"command":"send", "index":i2, "dataType":"data"}
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				binData=dataFile.read(HashTree.BLOCK_SIZE)
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				outcoming.writeMsg(jsonData,binData)
 
				stats.logTransferredBlock()
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				i1=i2
 
				progress.p(k)
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
@@ -111,26 +112,27 @@ class Client:
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			progress=Progress(len(blocksToTransfer))
 
			for (k,i2) in enumerate(blocksToTransfer):
 
				outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
				jsonData,binData=incoming.readMsg()
 
				assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				dataFile.write(binData)
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			dataFile.write(binData)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
				stats.logTransferredBlock()
 
				i1=i2
 
				progress.p(k)
 
			stats.logTransferredBlock()
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		with Connection(self._ssl) as (incoming,outcoming):
 
			outcoming.writeMsg({"command":"end"})
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/morevna.py
Show inline comments
 
@@ -7,8 +7,8 @@ from util import spawnDaemon
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client
 
from server import Server
 
from client import Client, Connection as ClientConnection
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
@@ -29,8 +29,10 @@ def push(args):
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	blocksToTransfer=c.negotiate()
 
	c.sendData(blocksToTransfer)
 
	with ClientConnection() as con:
 
		c.setConnection(con)
 
		blocksToTransfer=c.negotiate()
 
		c.sendData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
@@ -40,8 +42,10 @@ def pull(args):
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	blocksToTransfer=c.negotiate()
 
	c.pullData(blocksToTransfer)
 
	with ClientConnection() as con:
 
		c.setConnection(con)
 
		blocksToTransfer=c.negotiate()
 
		c.pullData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
@@ -52,7 +56,7 @@ def serve(args):
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Server(args.datafile,args.tree)
 
	s=Miniserver(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
src/server.py
Show inline comments
 
import hashlib
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import HashTree
 
@@ -28,19 +29,10 @@ class Connection:
 
		self._socket.close()
 

	
 

	
 
class Server:
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 
		self._locked=False
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
@@ -50,9 +42,36 @@ class Server:
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as c:
 
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
 
				p.start()
 
				p.join()
 

	
 

	
 
class Server:
 
	def __init__(self,connection,filename,treeFile=""):
 
		(self._incoming,self._outcoming)=connection
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 

	
 
	@staticmethod
 
	def run(*args):
 
		s=Server(*args)
 
		s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
@@ -60,31 +79,27 @@ class Server:
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as (incoming, outcoming):
 
				try:
 
					while self._serveOne(incoming,outcoming):
 
						pass
 
				except AssertionError:
 
					continue
 
		try:
 
			while self._serveOne(): pass
 
		except AssertionError as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._locked=True
 
			outcoming.writeMsg({"command": "ack"})
 
			self._outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
0 comments (0 inline, 0 general)