diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -8,8 +8,8 @@ import config as conf import status import stats from util import Progress -from hashtree import HashTree,hashBlock -from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException +from hashtree import HashTree, hashBlock +from netnode import BaseConnection, NetNode, FailedConnection, LockedException, IncompatibleException from datafile import DataFile @@ -17,25 +17,25 @@ class DeniedConnection(Exception): pass class Connection(BaseConnection): - def __init__(self,host,port): + def __init__(self, host, port): super().__init__() - sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sslContext=ssl.create_default_context(cafile=conf.peers) - sslContext.check_hostname=False - sslContext.load_cert_chain(conf.certfile,conf.keyfile) + sslContext = ssl.create_default_context(cafile=conf.peers) + sslContext.check_hostname = False + sslContext.load_cert_chain(conf.certfile, conf.keyfile) - self._socket=sslContext.wrap_socket(sock) + self._socket = sslContext.wrap_socket(sock) try: - self._socket.connect((host,port)) - except (ConnectionRefusedError,OSError) as e: + self._socket.connect((host, port)) + except (ConnectionRefusedError, OSError) as e: log.exception(e) - print("Couldn't connect to {0}:{1}".format(host,port)) + print("Couldn't connect to {0}:{1}".format(host, port)) raise FailedConnection() except ssl.SSLError as e: log.exception(e) - print("Error creating SSL connection to {0}:{1}".format(host,port)) + print("Error creating SSL connection to {0}:{1}".format(host, port)) raise FailedConnection() self.createNetworkers() @@ -43,14 +43,20 @@ class Connection(BaseConnection): class Client(NetNode): - def __init__(self,filename,treeFile=""): + def __init__(self, filename, treeFile=""): print(datetime.now(), "initializing...") - super().__init__(filename,treeFile) + super().__init__(filename, treeFile) - def init(self,action): - jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action} + def init(self, action): + jsonData = { + "command": "init", + "blockSize": self._tree.BLOCK_SIZE, + "blockCount": self._tree.leafCount, + "version": conf.version, + "action": action + } self._outcoming.writeMsg(jsonData) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() if jsonData["command"]=="deny": if jsonData["status"]==status.incompatible.version: raise DeniedConnection("Incompatible client version. Consider upgrading it.") @@ -65,28 +71,28 @@ class Client(NetNode): # # Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order. def negotiate(self): - localTree=self._tree - blocksToTransfer=[] - nodeStack=collections.deque([0]) # root + localTree = self._tree + blocksToTransfer = [] + nodeStack = collections.deque([0]) # root # determine which blocks to send print(datetime.now(), "negotiating:") - progress=Progress(localTree.leafCount) + progress = Progress(localTree.leafCount) while len(nodeStack)>0: - indices=[] + indices = [] for i in range(conf.batchSize.hash): indices.append(nodeStack.pop()) if len(nodeStack)==0: break self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"}) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["index"]==indices assert jsonData["dataType"]=="hash" stats.logExchangedNode(len(indices)) - frontier=[] - for (j,i) in enumerate(indices): - (j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)] + frontier = [] + for (j, i) in enumerate(indices): + (j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)] if localTree.store[i]!=binData[j1:j2]: # ie. 0-6 nodes, 7-14 leaves. 2*6+2<15 if 2*i+2=len(blocksToTransfer): break - i=blocksToTransfer[k+j] - block=dataFile.readFrom(i) + i = blocksToTransfer[k+j] + block = dataFile.readFrom(i) indices.append(i) blocks.append(block) - log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) progress.p(k+j) - if indices: self._sendData(indices,blocks) + if indices: self._sendData(indices, blocks) progress.done() - self._outcoming.writeMsg({"command":"end","action":"push"}) + self._outcoming.writeMsg({"command":"end", "action":"push"}) log.info("closing session...") dataFile.close() - def pullData(self,blocksToTransfer,ignoreLock=False): + def pullData(self, blocksToTransfer, ignoreLock=False): if not ignoreLock: try: self._lock() @@ -139,24 +145,24 @@ class Client(NetNode): print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") return log.info(blocksToTransfer) - dataFile=DataFile.open(self._filename, mode="rb+") + dataFile = DataFile.open(self._filename, mode="rb+") print(datetime.now(), "receiving data:") - progress=Progress(len(blocksToTransfer)) + progress = Progress(len(blocksToTransfer)) - for k in range(0,len(blocksToTransfer),conf.batchSize.data): - indices=blocksToTransfer[k:k+conf.batchSize.data] + for k in range(0, len(blocksToTransfer), conf.batchSize.data): + indices = blocksToTransfer[k:k+conf.batchSize.data] self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"}) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData - for (j,i) in enumerate(indices): - block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] - dataFile.writeAt(i,block) + for (j, i) in enumerate(indices): + block = binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] + dataFile.writeAt(i, block) if self._treeFile: - self._newLeaves[i+self._tree.leafStart]=hashBlock(block) + self._newLeaves[i+self._tree.leafStart] = hashBlock(block) - log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) stats.logTransferredBlock() progress.p(k+j) @@ -171,13 +177,13 @@ class Client(NetNode): if self._treeFile: self._updateTree() - def _sendData(self,indices,blocks): - jsonData={"command":"send", "index":indices, "dataType":"data"} - binData=b"".join(blocks) - self._outcoming.writeMsg(jsonData,binData) + def _sendData(self, indices, blocks): + jsonData = {"command":"send", "index":indices, "dataType":"data"} + binData = b"".join(blocks) + self._outcoming.writeMsg(jsonData, binData) stats.logTransferredBlock(len(indices)) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData - def setConnection(self,connection): - (self._incoming,self._outcoming)=connection + def setConnection(self, connection): + (self._incoming, self._outcoming) = connection