diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -8,7 +8,7 @@ import config as conf import status import stats from util import Progress -from hashtree import HashTree, hashBlock +from hashtree import HashTree, hash_block from netnode import BaseConnection, NetNode, FailedConnection, LockedException, IncompatibleException from datafile import DataFile @@ -21,11 +21,11 @@ class Connection(BaseConnection): super().__init__() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sslContext = ssl.create_default_context(cafile=conf.peers) - sslContext.check_hostname = False - sslContext.load_cert_chain(conf.certfile, conf.keyfile) + ssl_context = ssl.create_default_context(cafile=conf.peers) + ssl_context.check_hostname = False + ssl_context.load_cert_chain(conf.certfile, conf.keyfile) - self._socket = sslContext.wrap_socket(sock) + self._socket = ssl_context.wrap_socket(sock) try: self._socket.connect((host, port)) @@ -38,152 +38,153 @@ class Connection(BaseConnection): print("Error creating SSL connection to {0}:{1}".format(host, port)) raise FailedConnection() - self.createNetworkers() + self.create_networkers() print("Connected to {0}".format(host)) class Client(NetNode): - def __init__(self, filename, treeFile=""): + def __init__(self, filename, tree_file=""): print(datetime.now(), "initializing...") - super().__init__(filename, treeFile) + super().__init__(filename, tree_file) def init(self, action): - jsonData = { + json_data = { "command": "init", "blockSize": self._tree.BLOCK_SIZE, - "blockCount": self._tree.leafCount, + "blockCount": self._tree.leaf_count, "version": conf.version, "action": action } - self._outcoming.writeMsg(jsonData) - jsonData, binData = self._incoming.readMsg() - if jsonData["command"]=="deny": - if jsonData["status"]==status.incompatible.version: + self._outcoming.write_msg(json_data) + json_data, bin_data = self._incoming.read_msg() + if json_data["command"] == "deny": + if json_data["status"] == status.incompatible.version: raise DeniedConnection("Incompatible client version. Consider upgrading it.") raise DeniedConnection() - assert jsonData["command"]=="init" - if jsonData["version"]0: + progress = Progress(local_tree.leaf_count) + while len(node_stack) > 0: indices = [] - for i in range(conf.batchSize.hash): - indices.append(nodeStack.pop()) - if len(nodeStack)==0: break - self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"}) + for i in range(conf.batch_size.hash): + indices.append(node_stack.pop()) + if len(node_stack) == 0: break + self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "hash"}) - jsonData, binData = self._incoming.readMsg() - assert jsonData["index"]==indices - assert jsonData["dataType"]=="hash" - stats.logExchangedNode(len(indices)) + json_data, bin_data = self._incoming.read_msg() + assert json_data["index"] == indices + assert json_data["dataType"] == "hash" + stats.log_exchanged_node(len(indices)) frontier = [] for (j, i) in enumerate(indices): (j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)] - if localTree.store[i]!=binData[j1:j2]: + if local_tree.store[i] != bin_data[j1:j2]: # ie. 0-6 nodes, 7-14 leaves. 2*6+2<15 - if 2*i+2=len(blocksToTransfer): break - i = blocksToTransfer[k+j] - block = dataFile.readFrom(i) + for j in range(conf.batch_size.data): + if k+j >= len(blocks_to_transfer): break + i = blocks_to_transfer[k+j] + block = data_file.read_from(i) indices.append(i) blocks.append(block) log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) progress.p(k+j) - if indices: self._sendData(indices, blocks) + if indices: self._send_data(indices, blocks) progress.done() - self._outcoming.writeMsg({"command":"end", "action":"push"}) + self._outcoming.write_msg({"command": "end", "action": "push"}) log.info("closing session...") - dataFile.close() + data_file.close() - def pullData(self, blocksToTransfer, ignoreLock=False): - if not ignoreLock: + def pull_data(self, blocks_to_transfer, ignore_lock=False): + if not ignore_lock: try: self._lock() except LockedException: - print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") + print( + "The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") return - log.info(blocksToTransfer) - dataFile = DataFile.open(self._filename, mode="rb+") + log.info(blocks_to_transfer) + data_file = DataFile.open(self._filename, mode="rb+") print(datetime.now(), "receiving data:") - progress = Progress(len(blocksToTransfer)) + progress = Progress(len(blocks_to_transfer)) - for k in range(0, len(blocksToTransfer), conf.batchSize.data): - indices = blocksToTransfer[k:k+conf.batchSize.data] - self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"}) - jsonData, binData = self._incoming.readMsg() - assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData + for k in range(0, len(blocks_to_transfer), conf.batch_size.data): + indices = blocks_to_transfer[k:k+conf.batch_size.data] + self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"}) + json_data, bin_data = self._incoming.read_msg() + assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data for (j, i) in enumerate(indices): - block = binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] - dataFile.writeAt(i, block) + block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] + data_file.write_at(i, block) - if self._treeFile: - self._newLeaves[i+self._tree.leafStart] = hashBlock(block) + if self._tree_file: + self._new_leaves[i+self._tree.leaf_start] = hash_block(block) log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) - stats.logTransferredBlock() + stats.log_transferred_block() progress.p(k+j) progress.done() - self._outcoming.writeMsg({"command":"end"}) + self._outcoming.write_msg({"command": "end"}) log.info("closing session...") - dataFile.close() + data_file.close() self._unlock() - if self._treeFile: - self._updateTree() + if self._tree_file: + self._update_tree() - def _sendData(self, indices, blocks): - jsonData = {"command":"send", "index":indices, "dataType":"data"} - binData = b"".join(blocks) - self._outcoming.writeMsg(jsonData, binData) - stats.logTransferredBlock(len(indices)) - jsonData, binData = self._incoming.readMsg() - assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData + def _send_data(self, indices, blocks): + json_data = {"command": "send", "index": indices, "dataType": "data"} + bin_data = b"".join(blocks) + self._outcoming.write_msg(json_data, bin_data) + stats.log_transferred_block(len(indices)) + json_data, bin_data = self._incoming.read_msg() + assert json_data["command"]=="ack" and json_data["index"]==indices, json_data - def setConnection(self, connection): + def set_connection(self, connection): (self._incoming, self._outcoming) = connection