Files @ 5c80ca07f00c
Branch filter:

Location: Morevna/src/server.py

Laman
reformatted whitespace with more respect for PEP-8
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
import status
from datafile import DataFile


class Connection(BaseConnection):
	def __init__(self, serverSocket, sslContext):
		super().__init__()

		sock, address = serverSocket.accept()
		peer = sock.getpeername()
		try:
			self._socket = sslContext.wrap_socket(sock, server_side=True)
		except (ssl.SSLError,OSError) as e:
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
			raise e

		log.info('Connected by {0}'.format(address))
		self.createNetworkers()


class Miniserver:
	def __init__(self, filename, treeFile=""):
		self._filename = filename
		self._treeFile = treeFile

		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
		self._ssl.verify_mode = ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile, conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		p = None
		with self._ss:
			while True:
				try: connection = Connection(self._ss, self._ssl)
				except (ssl.SSLError, OSError): continue
				if p and p.is_alive():
					with connection as c:
						c[0].readMsg()
						c[1].writeMsg({"command":"deny", "status":status.locked})
					continue
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile))
				p.start()


class Server(NetNode):
	def __init__(self, connection, filename, treeFile=""):
		super().__init__(filename, treeFile)
		(self._incoming, self._outcoming) = connection

		self.BLOCK_SIZE = self._tree.BLOCK_SIZE

		self._dataFileObj = None

	@staticmethod
	def run(connection, *args):
		with connection as c:
			s = Server(c,*args)
			s.serve()

	@property
	def _dataFile(self):
		if not self._dataFileObj:
			self._dataFileObj = DataFile.open(self._filename, mode="rb+")
		return self._dataFileObj

	def serve(self):
		try:
			while self._serveOne(): pass
		except (AssertionError, ConnectionResetError) as e:
			log.warning(e)

	def _serveOne(self):
		jsonData, binData = self._incoming.readMsg()

		if jsonData["command"]=="init":
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters})
			if jsonData["version"]<conf.lowestCompatible:
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version})
			if jsonData["action"]=="pull" and self.isLocked():
				self._outcoming.writeMsg({"command":"deny", "status":status.locked})
			if jsonData["action"]=="push" and not self.isLocked():
				self._lock()

			self._outcoming.writeMsg({"command":"init", "version":conf.version})

		elif jsonData["command"]=="req":
			if jsonData["dataType"]=="data":
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
			else:
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))

		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
			self._outcoming.writeMsg(*self._receiveData(jsonData, binData))

		elif jsonData["command"]=="end":
			self._finalize()
			if jsonData.get("action")=="push": self._unlock()
			return False

		else:
			assert False, jsonData["command"]

		return True

	def _requestHash(self, indices):
		log.info("received request for nodes {0}".format(indices))
		assert all(i<len(self._tree.store) for i in indices)
		hashes = [self._tree.store[i] for i in indices]

		jsonResponse = {"command":"send", "index":indices, "dataType":"hash"}
		binResponse = b"".join(hashes)

		return (jsonResponse, binResponse)

	def _requestData(self, index):
		log.info("received request for data blocks {0}".format(index))

		jsonResponse = {"command":"send", "index":index, "dataType":"data"}
		blocks = []
		for i in index:
			blocks.append(self._dataFile.readFrom(i))

		return (jsonResponse, b"".join(blocks))

	def _receiveData(self, jsonData, binData):
		if not self.isLocked(): self._lock()
		log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:]))

		indices = jsonData["index"]
		for (i, k) in enumerate(indices):
			block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
			self._dataFile.writeAt(k, block)
			if self._treeFile:
				self._newLeaves[k+self._tree.leafStart] = hashBlock(block)

		return ({"command": "ack", "index": indices},)

	def _finalize(self):
		log.info("closing session...")
		self._dataFile.close()
		self._dataFileObj = None
		if self._treeFile:
			self._updateTree()
		log.info("done")