Changeset - a4bfaabaeabb
[Not reviewed]
default
0 2 0
Laman - 7 years ago 2018-01-25 02:14:02

refactored data file optimizations from client and server to datafile
2 files changed with 13 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import status
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException
 
from datafile import DataFile
 

	
 

	
 
class DeniedConnection(Exception): pass
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,host,port):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 

	
 
		try:
 
			self._socket.connect((host,port))
 
		except (ConnectionRefusedError,OSError) as e:
 
			log.exception(e)
 
			print("Couldn't connect to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 
		except ssl.SSLError as e:
 
			log.exception(e)
 
			print("Error creating SSL connection to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 

	
 
		self.createNetworkers()
 
		print("Connected to {0}".format(host))
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self,filename,treeFile=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename,treeFile)
 

	
 
	def init(self,action):
 
		jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		if jsonData["command"]=="deny":
 
			if jsonData["status"]==status.incompatible.version:
 
				raise DeniedConnection("Incompatible client version. Consider upgrading it.")
 
			raise DeniedConnection()
 
		assert jsonData["command"]=="init"
 
		if jsonData["version"]<conf.lowestCompatible:
 
			raise IncompatibleException("Incompatible server version. Consider upgrading it.")
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._tree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			indices=[]
 
			for i in range(conf.batchSize.hash):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode(len(indices))
 

	
 
			frontier=[]
 
			for (j,i) in enumerate(indices):
 
				(j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2<len(localTree.store): # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size=stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 
		dataFile=DataFile.open(self._filename)
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 

	
 
		for k in range(0,len(blocksToTransfer),conf.batchSize.data):
 
			indices=[]
 
			blocks=[]
 
			for j in range(conf.batchSize.data):
 
				if k+j>=len(blocksToTransfer): break
 
				i2=blocksToTransfer[k+j]
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				block=dataFile.read(HashTree.BLOCK_SIZE)
 
				i=blocksToTransfer[k+j]
 
				block=dataFile.readFrom(i)
 

	
 
				indices.append(i2)
 
				indices.append(i)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i2,block[:5],block[-5:]))
 
				log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:]))
 

	
 
				i1=i2
 
				progress.p(k+j)
 
			if indices: self._sendData(indices,blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end","action":"push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer,ignoreLock=False):
 
		if not ignoreLock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocksToTransfer)
 
		dataFile=DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress=Progress(len(blocksToTransfer))
 

	
 
		for k in range(0,len(blocksToTransfer),conf.batchSize.data):
 
			indices=blocksToTransfer[k:k+conf.batchSize.data]
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData
 
			for (j,i) in enumerate(indices):
 
				block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				dataFile.writeAt(i,block)
 

	
 
				if self._treeFile:
 
					self._newLeaves[i+self._tree.leafStart]=hashBlock(block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:]))
 

	
 
				stats.logTransferredBlock()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 

	
 
	def _sendData(self,indices,blocks):
 
		jsonData={"command":"send", "index":indices, "dataType":"data"}
 
		binData=b"".join(blocks)
 
		self._outcoming.writeMsg(jsonData,binData)
 
		stats.logTransferredBlock(len(indices))
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from netnode import BaseConnection,NetNode
 
import config as conf
 
import status
 
from datafile import DataFile
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,serverSocket,sslContext):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		p=None
 
		with self._ss:
 
			while True:
 
				connection=Connection(self._ss,self._ssl)
 
				if p and p.is_alive():
 
					with connection as c:
 
						c[0].readMsg()
 
						c[1].writeMsg({"command":"deny","status":status.locked})
 
					continue
 
				p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self,connection,filename,treeFile=""):
 
		super().__init__(filename,treeFile)
 
		(self._incoming,self._outcoming)=connection
 

	
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 
		self._dataFileObj=None
 

	
 
	@staticmethod
 
	def run(connection,*args):
 
		with connection as c:
 
			s=Server(c,*args)
 
			s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 
		if not self._dataFileObj:
 
			self._dataFileObj=DataFile.open(self._filename, mode="rb+")
 
		return self._dataFileObj
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
		except (AssertionError,ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters})
 
			if jsonData["version"]<conf.lowestCompatible:
 
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version})
 
			if jsonData["action"]=="pull" and self.isLocked():
 
				self._outcoming.writeMsg({"command":"deny","status":status.locked})
 
			if jsonData["action"]=="push" and not self.isLocked():
 
				self._lock()
 

	
 
			self._outcoming.writeMsg({"command":"init", "version":conf.version})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			if jsonData.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		blocks=[]
 
		for i in index:
 
			if self._lastIndex+1!=i:
 
				self._dataFile.seek(i*self.BLOCK_SIZE)
 
			blocks.append(self._dataFile.read(self.BLOCK_SIZE))
 
			blocks.append(self._dataFile.readFrom(i))
 

	
 
		return (jsonResponse,b"".join(blocks))
 

	
 
	def _receiveData(self,jsonData,binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		indices=jsonData["index"]
 
		for (i,k) in enumerate(indices):
 
			if self._lastIndex+1!=k:
 
				self._dataFile.seek(k*self.BLOCK_SIZE)
 
			block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._dataFile.write(block)
 
			self._lastIndex=k
 
			self._dataFile.writeAt(k,block)
 
			if self._treeFile:
 
				self._newLeaves[k+self._tree.leafStart]=hashBlock(block)
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		self._dataFileObj=None
 
		if self._treeFile:
 
			self._updateTree()
 
		log.info("done")
0 comments (0 inline, 0 general)