Files @ 02ea4fed2520
Branch filter:

Location: Morevna/src/server.py

Laman
fixes, error handling
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf


class Connection(BaseConnection):
	def __init__(self,serverSocket,sslContext):
		super().__init__()

		sock, address = serverSocket.accept()
		self._socket=sslContext.wrap_socket(sock,server_side=True)

		log.info('Connected by {0}'.format(address))
		self.createNetworkers()


class Miniserver:
	def __init__(self,filename,treeFile=""):
		self._filename=filename
		self._treeFile=treeFile

		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
		self._ssl.verify_mode=ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		while True:
			with Connection(self._ss,self._ssl) as c:
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
				p.start()
				p.join()


class Server(NetNode):
	def __init__(self,connection,filename,treeFile=""):
		super().__init__(filename,treeFile)
		(self._incoming,self._outcoming)=connection

		self.BLOCK_SIZE=self._tree.BLOCK_SIZE

		self._lastIndex=-1
		self._dataFileHandle=None

	@staticmethod
	def run(*args):
		s=Server(*args)
		s.serve()

	@property
	def _dataFile(self):
		if not self._dataFileHandle:
			self._dataFileHandle=open(self._filename, mode="rb+")
		return self._dataFileHandle

	def serve(self):
		try:
			while self._serveOne(): pass
		except (AssertionError,ConnectionResetError) as e:
			log.warning(e)

	def _serveOne(self):
		jsonData,binData=self._incoming.readMsg()

		if jsonData["command"]=="init":
			assert jsonData["blockSize"]==self.BLOCK_SIZE
			assert jsonData["blockCount"]==self._tree.leafCount
			self._outcoming.writeMsg({"command": "ack"})

		elif jsonData["command"]=="req":
			if jsonData["dataType"]=="data":
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
			else:
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))

		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))

		elif jsonData["command"]=="end":
			self._finalize()
			self._locked=False
			return False

		else:
			assert False, jsonData["command"]

		return True

	def _requestHash(self,indices):
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
		assert all(i<len(self._tree.store) for i in indices)
		hashes=[self._tree.store[i] for i in indices]

		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
		binResponse=b"".join(hashes)

		return (jsonResponse,binResponse)

	def _requestData(self,index):
		log.info("received request for data block #{0}".format(index))

		jsonResponse={"command":"send", "index":index, "dataType":"data"}
		if self._lastIndex+1!=index:
			self._dataFile.seek(index*self.BLOCK_SIZE)
		binResponse=self._dataFile.read(self.BLOCK_SIZE)

		return (jsonResponse,binResponse)

	def _receiveData(self,jsonData,binData):
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))

		i=jsonData["index"]
		if self._lastIndex+1!=i:
			self._dataFile.seek(i*self.BLOCK_SIZE)
		self._dataFile.write(binData)
		self._lastIndex=i
		if self._treeFile:
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)

		return ({"command": "ack", "index": i},)

	def _finalize(self):
		log.info("closing session...")
		self._dataFile.close()
		self._dataFileHandle=None
		if self._treeFile:
			self._updateTree()
		log.info("done")