from datetime import datetime import socket import ssl import multiprocessing import logging as log from hashtree import hash_block from netnode import BaseConnection, NetNode import config as conf import status from datafile import DataFile class Connection(BaseConnection): def __init__(self, server_socket, ssl_context): super().__init__() sock, address = server_socket.accept() peer = sock.getpeername() try: self._socket = ssl_context.wrap_socket(sock, server_side=True) except (ssl.SSLError,OSError) as e: log.warning("Failed to establish an SSL connection from {0}.".format(peer)) raise e log.info('Connected by {0}'.format(address)) self.create_networkers() class Miniserver: def __init__(self, filename, tree_file=""): self._filename = filename self._tree_file = tree_file self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers) self._ssl.verify_mode = ssl.CERT_REQUIRED self._ssl.load_cert_chain(conf.certfile, conf.keyfile) self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._ss.bind(("", conf.port)) self._ss.listen(1) def serve(self): p = None with self._ss: while True: try: connection = Connection(self._ss, self._ssl) except (ssl.SSLError, OSError): continue if p and p.is_alive(): with connection as c: c[0].read_msg() c[1].write_msg({"command": "deny", "status":status.locked}) continue p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file)) p.start() class Server(NetNode): def __init__(self, connection, filename, tree_file=""): super().__init__(filename, tree_file) (self._incoming, self._outcoming) = connection self.BLOCK_SIZE = self._tree.BLOCK_SIZE self._data_file_obj = None self._last_flushed = 0 @staticmethod def run(connection, *args): with connection as c: s = Server(c, *args) s.serve() @property def _data_file(self): if not self._data_file_obj: self._data_file_obj = DataFile.open(self._filename, mode="rb+") return self._data_file_obj def serve(self): try: while self._serve_one(): pass except (AssertionError, ConnectionResetError) as e: log.warning(e) def _serve_one(self): json_data, bin_data = self._incoming.read_msg() if json_data["command"]=="init": if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count: self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters}) if json_data["version"]=60: self._data_file.flush() if self._tree_file: self._update_tree() self._refresh_lock() self._last_flushed = t return ({"command": "ack", "index": indices},) def _finalize(self): log.info("closing session...") self._data_file.close() self._data_file_obj = None if self._tree_file: self._update_tree() log.info("done")