Files @ 0e02ff79165c
Branch filter:

Location: Morevna/src/server.py

Laman
fix: another error handled
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
import status
from datafile import DataFile


class Connection(BaseConnection):
	def __init__(self,serverSocket,sslContext):
		super().__init__()

		sock, address = serverSocket.accept()
		peer=sock.getpeername()
		try:
			self._socket=sslContext.wrap_socket(sock,server_side=True)
		except (ssl.SSLError,OSError) as e:
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
			raise e

		log.info('Connected by {0}'.format(address))
		self.createNetworkers()


class Miniserver:
	def __init__(self,filename,treeFile=""):
		self._filename=filename
		self._treeFile=treeFile

		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
		self._ssl.verify_mode=ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		p=None
		with self._ss:
			while True:
				try: connection=Connection(self._ss,self._ssl)
				except (ssl.SSLError,OSError): continue
				if p and p.is_alive():
					with connection as c:
						c[0].readMsg()
						c[1].writeMsg({"command":"deny","status":status.locked})
					continue
				p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
				p.start()


class Server(NetNode):
	def __init__(self,connection,filename,treeFile=""):
		super().__init__(filename,treeFile)
		(self._incoming,self._outcoming)=connection

		self.BLOCK_SIZE=self._tree.BLOCK_SIZE

		self._dataFileObj=None

	@staticmethod
	def run(connection,*args):
		with connection as c:
			s=Server(c,*args)
			s.serve()

	@property
	def _dataFile(self):
		if not self._dataFileObj:
			self._dataFileObj=DataFile.open(self._filename, mode="rb+")
		return self._dataFileObj

	def serve(self):
		try:
			while self._serveOne(): pass
		except (AssertionError,ConnectionResetError) as e:
			log.warning(e)

	def _serveOne(self):
		jsonData,binData=self._incoming.readMsg()

		if jsonData["command"]=="init":
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters})
			if jsonData["version"]<conf.lowestCompatible:
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version})
			if jsonData["action"]=="pull" and self.isLocked():
				self._outcoming.writeMsg({"command":"deny","status":status.locked})
			if jsonData["action"]=="push" and not self.isLocked():
				self._lock()

			self._outcoming.writeMsg({"command":"init", "version":conf.version})

		elif jsonData["command"]=="req":
			if jsonData["dataType"]=="data":
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
			else:
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))

		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))

		elif jsonData["command"]=="end":
			self._finalize()
			if jsonData.get("action")=="push": self._unlock()
			return False

		else:
			assert False, jsonData["command"]

		return True

	def _requestHash(self,indices):
		log.info("received request for nodes {0}".format(indices))
		assert all(i<len(self._tree.store) for i in indices)
		hashes=[self._tree.store[i] for i in indices]

		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
		binResponse=b"".join(hashes)

		return (jsonResponse,binResponse)

	def _requestData(self,index):
		log.info("received request for data blocks {0}".format(index))

		jsonResponse={"command":"send", "index":index, "dataType":"data"}
		blocks=[]
		for i in index:
			blocks.append(self._dataFile.readFrom(i))

		return (jsonResponse,b"".join(blocks))

	def _receiveData(self,jsonData,binData):
		if not self.isLocked(): self._lock()
		log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))

		indices=jsonData["index"]
		for (i,k) in enumerate(indices):
			block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
			self._dataFile.writeAt(k,block)
			if self._treeFile:
				self._newLeaves[k+self._tree.leafStart]=hashBlock(block)

		return ({"command": "ack", "index": indices},)

	def _finalize(self):
		log.info("closing session...")
		self._dataFile.close()
		self._dataFileObj=None
		if self._treeFile:
			self._updateTree()
		log.info("done")