import socket import ssl import multiprocessing import logging as log from hashtree import hashBlock from netnode import BaseConnection,NetNode import config as conf import status from datafile import DataFile class Connection(BaseConnection): def __init__(self, serverSocket, sslContext): super().__init__() sock, address = serverSocket.accept() peer = sock.getpeername() try: self._socket = sslContext.wrap_socket(sock, server_side=True) except (ssl.SSLError,OSError) as e: log.warning("Failed to establish an SSL connection from {0}.".format(peer)) raise e log.info('Connected by {0}'.format(address)) self.createNetworkers() class Miniserver: def __init__(self, filename, treeFile=""): self._filename = filename self._treeFile = treeFile self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers) self._ssl.verify_mode = ssl.CERT_REQUIRED self._ssl.load_cert_chain(conf.certfile, conf.keyfile) self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._ss.bind(("", conf.port)) self._ss.listen(1) def serve(self): p = None with self._ss: while True: try: connection = Connection(self._ss, self._ssl) except (ssl.SSLError, OSError): continue if p and p.is_alive(): with connection as c: c[0].readMsg() c[1].writeMsg({"command":"deny", "status":status.locked}) continue p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile)) p.start() class Server(NetNode): def __init__(self, connection, filename, treeFile=""): super().__init__(filename, treeFile) (self._incoming, self._outcoming) = connection self.BLOCK_SIZE = self._tree.BLOCK_SIZE self._dataFileObj = None @staticmethod def run(connection, *args): with connection as c: s = Server(c,*args) s.serve() @property def _dataFile(self): if not self._dataFileObj: self._dataFileObj = DataFile.open(self._filename, mode="rb+") return self._dataFileObj def serve(self): try: while self._serveOne(): pass except (AssertionError, ConnectionResetError) as e: log.warning(e) def _serveOne(self): jsonData, binData = self._incoming.readMsg() if jsonData["command"]=="init": if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount: self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters}) if jsonData["version"]<conf.lowestCompatible: self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version}) if jsonData["action"]=="pull" and self.isLocked(): self._outcoming.writeMsg({"command":"deny", "status":status.locked}) if jsonData["action"]=="push" and not self.isLocked(): self._lock() self._outcoming.writeMsg({"command":"init", "version":conf.version}) elif jsonData["command"]=="req": if jsonData["dataType"]=="data": self._outcoming.writeMsg(*self._requestData(jsonData["index"])) else: self._outcoming.writeMsg(*self._requestHash(jsonData["index"])) elif jsonData["command"]=="send" and jsonData["dataType"]=="data": self._outcoming.writeMsg(*self._receiveData(jsonData, binData)) elif jsonData["command"]=="end": self._finalize() if jsonData.get("action")=="push": self._unlock() return False else: assert False, jsonData["command"] return True def _requestHash(self, indices): log.info("received request for nodes {0}".format(indices)) assert all(i<len(self._tree.store) for i in indices) hashes = [self._tree.store[i] for i in indices] jsonResponse = {"command":"send", "index":indices, "dataType":"hash"} binResponse = b"".join(hashes) return (jsonResponse, binResponse) def _requestData(self, index): log.info("received request for data blocks {0}".format(index)) jsonResponse = {"command":"send", "index":index, "dataType":"data"} blocks = [] for i in index: blocks.append(self._dataFile.readFrom(i)) return (jsonResponse, b"".join(blocks)) def _receiveData(self, jsonData, binData): if not self.isLocked(): self._lock() log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:])) indices = jsonData["index"] for (i, k) in enumerate(indices): block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE] self._dataFile.writeAt(k, block) if self._treeFile: self._newLeaves[k+self._tree.leafStart] = hashBlock(block) return ({"command": "ack", "index": indices},) def _finalize(self): log.info("closing session...") self._dataFile.close() self._dataFileObj = None if self._treeFile: self._updateTree() log.info("done")