diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -3,7 +3,7 @@ import ssl import multiprocessing import logging as log -from hashtree import hashBlock +from hashtree import hash_block from netnode import BaseConnection,NetNode import config as conf import status @@ -11,25 +11,25 @@ from datafile import DataFile class Connection(BaseConnection): - def __init__(self, serverSocket, sslContext): + def __init__(self, server_socket, ssl_context): super().__init__() - sock, address = serverSocket.accept() + sock, address = server_socket.accept() peer = sock.getpeername() try: - self._socket = sslContext.wrap_socket(sock, server_side=True) + self._socket = ssl_context.wrap_socket(sock, server_side=True) except (ssl.SSLError,OSError) as e: log.warning("Failed to establish an SSL connection from {0}.".format(peer)) raise e log.info('Connected by {0}'.format(address)) - self.createNetworkers() + self.create_networkers() class Miniserver: - def __init__(self, filename, treeFile=""): + def __init__(self, filename, tree_file=""): self._filename = filename - self._treeFile = treeFile + self._tree_file = tree_file self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers) self._ssl.verify_mode = ssl.CERT_REQUIRED @@ -47,111 +47,111 @@ class Miniserver: except (ssl.SSLError, OSError): continue if p and p.is_alive(): with connection as c: - c[0].readMsg() - c[1].writeMsg({"command":"deny", "status":status.locked}) + c[0].read_msg() + c[1].write_msg({"command": "deny", "status":status.locked}) continue - p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile)) + p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file)) p.start() class Server(NetNode): - def __init__(self, connection, filename, treeFile=""): - super().__init__(filename, treeFile) + def __init__(self, connection, filename, tree_file=""): + super().__init__(filename, tree_file) (self._incoming, self._outcoming) = connection self.BLOCK_SIZE = self._tree.BLOCK_SIZE - self._dataFileObj = None + self._data_file_obj = None @staticmethod def run(connection, *args): with connection as c: - s = Server(c,*args) + s = Server(c, *args) s.serve() @property - def _dataFile(self): - if not self._dataFileObj: - self._dataFileObj = DataFile.open(self._filename, mode="rb+") - return self._dataFileObj + def _data_file(self): + if not self._data_file_obj: + self._data_file_obj = DataFile.open(self._filename, mode="rb+") + return self._data_file_obj def serve(self): try: - while self._serveOne(): pass + while self._serve_one(): pass except (AssertionError, ConnectionResetError) as e: log.warning(e) - def _serveOne(self): - jsonData, binData = self._incoming.readMsg() + def _serve_one(self): + json_data, bin_data = self._incoming.read_msg() - if jsonData["command"]=="init": - if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount: - self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters}) - if jsonData["version"]<conf.lowestCompatible: - self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version}) - if jsonData["action"]=="pull" and self.isLocked(): - self._outcoming.writeMsg({"command":"deny", "status":status.locked}) - if jsonData["action"]=="push" and not self.isLocked(): + if json_data["command"]=="init": + if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count: + self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters}) + if json_data["version"]<conf.lowest_compatible: + self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version}) + if json_data["action"]=="pull" and self.is_locked(): + self._outcoming.write_msg({"command": "deny", "status": status.locked}) + if json_data["action"]=="push" and not self.is_locked(): self._lock() - self._outcoming.writeMsg({"command":"init", "version":conf.version}) + self._outcoming.write_msg({"command": "init", "version": conf.version}) - elif jsonData["command"]=="req": - if jsonData["dataType"]=="data": - self._outcoming.writeMsg(*self._requestData(jsonData["index"])) + elif json_data["command"]=="req": + if json_data["dataType"]=="data": + self._outcoming.write_msg(*self._request_data(json_data["index"])) else: - self._outcoming.writeMsg(*self._requestHash(jsonData["index"])) + self._outcoming.write_msg(*self._request_hash(json_data["index"])) - elif jsonData["command"]=="send" and jsonData["dataType"]=="data": - self._outcoming.writeMsg(*self._receiveData(jsonData, binData)) + elif json_data["command"]=="send" and json_data["dataType"]=="data": + self._outcoming.write_msg(*self._receive_data(json_data, bin_data)) - elif jsonData["command"]=="end": + elif json_data["command"]=="end": self._finalize() - if jsonData.get("action")=="push": self._unlock() + if json_data.get("action")=="push": self._unlock() return False else: - assert False, jsonData["command"] + assert False, json_data["command"] return True - def _requestHash(self, indices): + def _request_hash(self, indices): log.info("received request for nodes {0}".format(indices)) assert all(i<len(self._tree.store) for i in indices) hashes = [self._tree.store[i] for i in indices] - jsonResponse = {"command":"send", "index":indices, "dataType":"hash"} - binResponse = b"".join(hashes) + json_response = {"command": "send", "index": indices, "dataType": "hash"} + bin_response = b"".join(hashes) - return (jsonResponse, binResponse) + return (json_response, bin_response) - def _requestData(self, index): + def _request_data(self, index): log.info("received request for data blocks {0}".format(index)) - jsonResponse = {"command":"send", "index":index, "dataType":"data"} + json_response = {"command": "send", "index": index, "dataType": "data"} blocks = [] for i in index: - blocks.append(self._dataFile.readFrom(i)) + blocks.append(self._data_file.read_from(i)) - return (jsonResponse, b"".join(blocks)) + return (json_response, b"".join(blocks)) - def _receiveData(self, jsonData, binData): - if not self.isLocked(): self._lock() - log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:])) + def _receive_data(self, json_data, bin_data): + if not self.is_locked(): self._lock() + log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:])) - indices = jsonData["index"] + indices = json_data["index"] for (i, k) in enumerate(indices): - block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE] - self._dataFile.writeAt(k, block) - if self._treeFile: - self._newLeaves[k+self._tree.leafStart] = hashBlock(block) + block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE] + self._data_file.write_at(k, block) + if self._tree_file: + self._new_leaves[k+self._tree.leaf_start] = hash_block(block) return ({"command": "ack", "index": indices},) def _finalize(self): log.info("closing session...") - self._dataFile.close() - self._dataFileObj = None - if self._treeFile: - self._updateTree() + self._data_file.close() + self._data_file_obj = None + if self._tree_file: + self._update_tree() log.info("done")