diff --git a/src/benchmark.py b/src/benchmark.py --- a/src/benchmark.py +++ b/src/benchmark.py @@ -6,49 +6,49 @@ from hashtree import HashTree def timeF(f): - start=time() + start = time() f() - end=time() - print((end-start),"s") + end = time() + print((end-start), "s") def fullRead(): - block=True - with open("/home/laman/ext2.img",mode="rb") as f: + block = True + with open("/home/laman/ext2.img", mode="rb") as f: while block: - block=f.read(HashTree.BLOCK_SIZE) + block = f.read(HashTree.BLOCK_SIZE) def selectedRead(): with open("/home/laman/blocks.txt") as f: - blocks=[int(x) for x in f] - with open("/home/laman/ext2.img",mode="rb") as f: - i1=-1 + blocks = [int(x) for x in f] + with open("/home/laman/ext2.img", mode="rb") as f: + i1 = -1 for i2 in blocks: if i1+1!=i2: f.seek(i2*HashTree.BLOCK_SIZE) - block=f.read(HashTree.BLOCK_SIZE) - i1=i2 + block = f.read(HashTree.BLOCK_SIZE) + i1 = i2 def lessSelectedRead(): with open("/home/laman/blocks.txt") as f: - blocks=[int(x) for x in f] - with open("/home/laman/ext2.img",mode="rb") as f: - i1=-1 + blocks = [int(x) for x in f] + with open("/home/laman/ext2.img", mode="rb") as f: + i1 = -1 for i2 in blocks: if i2<=i1+8: - block=f.read(HashTree.BLOCK_SIZE*(i2-i1)) + block = f.read(HashTree.BLOCK_SIZE*(i2-i1)) else: f.seek(i2*HashTree.BLOCK_SIZE) - block=f.read(HashTree.BLOCK_SIZE) - i1=i2 + block = f.read(HashTree.BLOCK_SIZE) + i1 = i2 def shortSockets(): def _server(): - serverSock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serverSock.bind(("",12329)) + serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serverSock.bind(("", 12329)) serverSock.listen(1) for i in range(10000): @@ -61,7 +61,7 @@ def shortSockets(): def _client(): for i in range(10000): - sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("127.0.0.1", 12329)) with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw: fw.write(b"x"*4096+b"\n") @@ -69,9 +69,9 @@ def shortSockets(): sock.shutdown(socket.SHUT_RDWR) sock.close() - s=threading.Thread(target=_server) + s = threading.Thread(target=_server) s.start() - c=threading.Thread(target=_client) + c = threading.Thread(target=_client) c.start() s.join() c.join() @@ -79,8 +79,8 @@ def shortSockets(): def longSockets(): def _server(): - serverSock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serverSock.bind(("",12330)) + serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serverSock.bind(("", 12330)) serverSock.listen(1) sock, address = serverSock.accept() @@ -93,7 +93,7 @@ def longSockets(): serverSock.close() def _client(): - sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("127.0.0.1", 12330)) with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw: for i in range(10000): @@ -102,9 +102,9 @@ def longSockets(): sock.shutdown(socket.SHUT_RDWR) sock.close() - s=threading.Thread(target=_server) + s = threading.Thread(target=_server) s.start() - c=threading.Thread(target=_client) + c = threading.Thread(target=_client) c.start() s.join() c.join() diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -8,8 +8,8 @@ import config as conf import status import stats from util import Progress -from hashtree import HashTree,hashBlock -from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException +from hashtree import HashTree, hashBlock +from netnode import BaseConnection, NetNode, FailedConnection, LockedException, IncompatibleException from datafile import DataFile @@ -17,25 +17,25 @@ class DeniedConnection(Exception): pass class Connection(BaseConnection): - def __init__(self,host,port): + def __init__(self, host, port): super().__init__() - sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sslContext=ssl.create_default_context(cafile=conf.peers) - sslContext.check_hostname=False - sslContext.load_cert_chain(conf.certfile,conf.keyfile) + sslContext = ssl.create_default_context(cafile=conf.peers) + sslContext.check_hostname = False + sslContext.load_cert_chain(conf.certfile, conf.keyfile) - self._socket=sslContext.wrap_socket(sock) + self._socket = sslContext.wrap_socket(sock) try: - self._socket.connect((host,port)) - except (ConnectionRefusedError,OSError) as e: + self._socket.connect((host, port)) + except (ConnectionRefusedError, OSError) as e: log.exception(e) - print("Couldn't connect to {0}:{1}".format(host,port)) + print("Couldn't connect to {0}:{1}".format(host, port)) raise FailedConnection() except ssl.SSLError as e: log.exception(e) - print("Error creating SSL connection to {0}:{1}".format(host,port)) + print("Error creating SSL connection to {0}:{1}".format(host, port)) raise FailedConnection() self.createNetworkers() @@ -43,14 +43,20 @@ class Connection(BaseConnection): class Client(NetNode): - def __init__(self,filename,treeFile=""): + def __init__(self, filename, treeFile=""): print(datetime.now(), "initializing...") - super().__init__(filename,treeFile) + super().__init__(filename, treeFile) - def init(self,action): - jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action} + def init(self, action): + jsonData = { + "command": "init", + "blockSize": self._tree.BLOCK_SIZE, + "blockCount": self._tree.leafCount, + "version": conf.version, + "action": action + } self._outcoming.writeMsg(jsonData) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() if jsonData["command"]=="deny": if jsonData["status"]==status.incompatible.version: raise DeniedConnection("Incompatible client version. Consider upgrading it.") @@ -65,28 +71,28 @@ class Client(NetNode): # # Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order. def negotiate(self): - localTree=self._tree - blocksToTransfer=[] - nodeStack=collections.deque([0]) # root + localTree = self._tree + blocksToTransfer = [] + nodeStack = collections.deque([0]) # root # determine which blocks to send print(datetime.now(), "negotiating:") - progress=Progress(localTree.leafCount) + progress = Progress(localTree.leafCount) while len(nodeStack)>0: - indices=[] + indices = [] for i in range(conf.batchSize.hash): indices.append(nodeStack.pop()) if len(nodeStack)==0: break self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"}) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["index"]==indices assert jsonData["dataType"]=="hash" stats.logExchangedNode(len(indices)) - frontier=[] - for (j,i) in enumerate(indices): - (j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)] + frontier = [] + for (j, i) in enumerate(indices): + (j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)] if localTree.store[i]!=binData[j1:j2]: # ie. 0-6 nodes, 7-14 leaves. 2*6+2<15 if 2*i+2=len(blocksToTransfer): break - i=blocksToTransfer[k+j] - block=dataFile.readFrom(i) + i = blocksToTransfer[k+j] + block = dataFile.readFrom(i) indices.append(i) blocks.append(block) - log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) progress.p(k+j) - if indices: self._sendData(indices,blocks) + if indices: self._sendData(indices, blocks) progress.done() - self._outcoming.writeMsg({"command":"end","action":"push"}) + self._outcoming.writeMsg({"command":"end", "action":"push"}) log.info("closing session...") dataFile.close() - def pullData(self,blocksToTransfer,ignoreLock=False): + def pullData(self, blocksToTransfer, ignoreLock=False): if not ignoreLock: try: self._lock() @@ -139,24 +145,24 @@ class Client(NetNode): print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") return log.info(blocksToTransfer) - dataFile=DataFile.open(self._filename, mode="rb+") + dataFile = DataFile.open(self._filename, mode="rb+") print(datetime.now(), "receiving data:") - progress=Progress(len(blocksToTransfer)) + progress = Progress(len(blocksToTransfer)) - for k in range(0,len(blocksToTransfer),conf.batchSize.data): - indices=blocksToTransfer[k:k+conf.batchSize.data] + for k in range(0, len(blocksToTransfer), conf.batchSize.data): + indices = blocksToTransfer[k:k+conf.batchSize.data] self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"}) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData - for (j,i) in enumerate(indices): - block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] - dataFile.writeAt(i,block) + for (j, i) in enumerate(indices): + block = binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] + dataFile.writeAt(i, block) if self._treeFile: - self._newLeaves[i+self._tree.leafStart]=hashBlock(block) + self._newLeaves[i+self._tree.leafStart] = hashBlock(block) - log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) stats.logTransferredBlock() progress.p(k+j) @@ -171,13 +177,13 @@ class Client(NetNode): if self._treeFile: self._updateTree() - def _sendData(self,indices,blocks): - jsonData={"command":"send", "index":indices, "dataType":"data"} - binData=b"".join(blocks) - self._outcoming.writeMsg(jsonData,binData) + def _sendData(self, indices, blocks): + jsonData = {"command":"send", "index":indices, "dataType":"data"} + binData = b"".join(blocks) + self._outcoming.writeMsg(jsonData, binData) stats.logTransferredBlock(len(indices)) - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData - def setConnection(self,connection): - (self._incoming,self._outcoming)=connection + def setConnection(self, connection): + (self._incoming, self._outcoming) = connection diff --git a/src/config.py b/src/config.py --- a/src/config.py +++ b/src/config.py @@ -12,24 +12,24 @@ if os.path.isfile(configFile): logFile = conf.get("logFile", "/var/log/morevna/mor.log") -logger=log.getLogger() +logger = log.getLogger() logger.setLevel(log.INFO) -formatter=log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S") +formatter = log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S") handler = TimedRotatingFileHandler(logFile, when="midnight", backupCount=9) handler.setFormatter(formatter) logger.addHandler(handler) -certfile=os.path.join(directory,"certs/cert.pem") -keyfile=os.path.join(directory,"certs/key.pem") -peers=os.path.join(directory,"certs/peers.pem") +certfile = os.path.join(directory,"certs/cert.pem") +keyfile = os.path.join(directory,"certs/key.pem") +peers = os.path.join(directory,"certs/peers.pem") -version=[0,1,1] -lowestCompatible=[0,1,0] # tuple is more fitting but json conversion transforms it into a list anyway +version = [0, 1, 1] +lowestCompatible=[0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway -hosts=conf.get("hosts",["127.0.0.1"]) -port=conf.get("port",9901) +hosts=conf.get("hosts", ["127.0.0.1"]) +port=conf.get("port", 9901) -bSize=conf.get("batchSize",dict()) +bSize=conf.get("batchSize", dict()) class batchSize: - hash=bSize.get("hash",256) - data=bSize.get("data",64) + hash=bSize.get("hash", 256) + data=bSize.get("data", 64) diff --git a/src/datafile.py b/src/datafile.py --- a/src/datafile.py +++ b/src/datafile.py @@ -1,28 +1,28 @@ from hashtree import HashTree -BLOCK_SIZE=HashTree.BLOCK_SIZE +BLOCK_SIZE = HashTree.BLOCK_SIZE class DataFile: - def __init__(self,fileHandle): - self._lastIndex=0 - self._f=fileHandle + def __init__(self, fileHandle): + self._lastIndex = 0 + self._f = fileHandle @staticmethod - def open(filename,mode="rb"): - return DataFile(open(filename,mode=mode)) + def open(filename, mode="rb"): + return DataFile(open(filename, mode=mode)) - def writeAt(self,i,blockData): + def writeAt(self, i, blockData): if i!=self._lastIndex+1: self._f.seek(i*BLOCK_SIZE) self._f.write(blockData) - self._lastIndex=i + self._lastIndex = i - def readFrom(self,i,byteCount=BLOCK_SIZE): + def readFrom(self, i, byteCount=BLOCK_SIZE): if i!=self._lastIndex+1: self._f.seek(i*BLOCK_SIZE) - self._lastIndex=i + self._lastIndex = i return self._f.read(byteCount) def close(self): diff --git a/src/hashtree.py b/src/hashtree.py --- a/src/hashtree.py +++ b/src/hashtree.py @@ -11,28 +11,28 @@ def hashBlock(data): class HashTree: - HASH_LEN=16 # bytes - BLOCK_SIZE=4096 # bytes + HASH_LEN = 16 # bytes + BLOCK_SIZE = 4096 # bytes ## Prepares a tree containing leafCount leaves. - def __init__(self,leafCount): - self.store=[b""]*(leafCount*2-1) - self.leafStart=leafCount-1 - self.leafCount=leafCount - self._index=self.leafStart + def __init__(self, leafCount): + self.store = [b""]*(leafCount*2-1) + self.leafStart = leafCount-1 + self.leafCount = leafCount + self._index = self.leafStart @classmethod - def fromFile(cls,filename): - with open(filename,"rb") as f: - stat=os.fstat(f.fileno()) - size=stat.st_size # !! symlinks - leafCount=(size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks - res=cls(leafCount) + def fromFile(cls, filename): + with open(filename, "rb") as f: + stat = os.fstat(f.fileno()) + size = stat.st_size # !! symlinks + leafCount = (size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks + res = cls(leafCount) print(datetime.now(), "hashing file:") - progress=Progress(leafCount) + progress = Progress(leafCount) for i in range(leafCount): - data=f.read(HashTree.BLOCK_SIZE) + data = f.read(HashTree.BLOCK_SIZE) res.insertLeaf(hashBlock(data)) progress.p(i) @@ -42,19 +42,19 @@ class HashTree: return res @classmethod - def load(cls,filename): - with open(filename,"rb") as f: - stat=os.fstat(f.fileno()) - size=stat.st_size - nodeCount=size//HashTree.HASH_LEN - res=cls((nodeCount+1)//2) + def load(cls, filename): + with open(filename, "rb") as f: + stat = os.fstat(f.fileno()) + size = stat.st_size + nodeCount = size//HashTree.HASH_LEN + res = cls((nodeCount+1)//2) for i in range(nodeCount): - res.store[i]=f.read(HashTree.HASH_LEN) + res.store[i] = f.read(HashTree.HASH_LEN) return res - def save(self,filename): - with open(filename,"wb") as f: + def save(self, filename): + with open(filename, "wb") as f: for h in self.store: f.write(h) @@ -62,43 +62,43 @@ class HashTree: # # Useful and used only during the tree construction. def insertLeaf(self,h): - self.store[self._index]=h - self._index+=1 + self.store[self._index] = h + self._index += 1 ## Updates a hash stored in the leaf. - def updateLeaf(self,index,h): + def updateLeaf(self, index, h): if index=0: - self.store[index]=hashBlock(self.store[index*2+1]+self.store[index*2+2]) - index=(index-1)//2 + self.store[index] = hashBlock(self.store[index*2+1]+self.store[index*2+2]) + index = (index-1)//2 ## Fast construction of the tree over the leaves. O(n). def buildTree(self): print(datetime.now(), "building tree:") progress=Progress(-1, self.leafStart-1) - for i in range(self.leafStart-1,-1,-1): - self.store[i]=hashBlock(self.store[i*2+1]+self.store[i*2+2]) + for i in range(self.leafStart-1, -1, -1): + self.store[i] = hashBlock(self.store[i*2+1]+self.store[i*2+2]) progress.p(i) progress.done() ## Update faster than repeated insertLeaf. - def batchUpdate(self,keysHashes): - queue=collections.deque() - for (k,v) in sorted(keysHashes): - self.store[k]=v - parentK=(k-1)//2 + def batchUpdate(self, keysHashes): + queue = collections.deque() + for (k, v) in sorted(keysHashes): + self.store[k] = v + parentK = (k-1)//2 if len(queue)==0 or queue[-1]!=parentK: queue.append(parentK) while len(queue)>0: - k=queue.pop() - self.store[k]=hashBlock(self.store[k*2+1]+self.store[k*2+2]) - parentK=(k-1)//2 + k = queue.pop() + self.store[k] = hashBlock(self.store[k*2+1]+self.store[k*2+2]) + parentK = (k-1)//2 if (len(queue)==0 or queue[0]!=parentK) and k!=0: queue.appendleft(parentK) diff --git a/src/morevna.py b/src/morevna.py --- a/src/morevna.py +++ b/src/morevna.py @@ -13,38 +13,39 @@ from server import Miniserver def _checkFile(f): if not os.path.isfile(f): - print("invalid file specified:",f,file=sys.stderr) + print("invalid file specified:", f, file=sys.stderr) sys.exit(1) def buildTree(args): _checkFile(args.datafile) if os.path.isfile(args.treefile): - treeMod=os.stat(args.treefile).st_mtime - dataMod=os.stat(args.datafile).st_mtime + treeMod = os.stat(args.treefile).st_mtime + dataMod = os.stat(args.datafile).st_mtime if dataMod length + jsonLength = int(data.split(b":")[1].strip()) # "json-length: length" -> length - data=self._stream.readline() + data = self._stream.readline() assert data stats.logReceived(data) - binLength=int(data.split(b":")[1].strip()) # "bin-length: length" -> length + binLength = int(data.split(b":")[1].strip()) # "bin-length: length" -> length - jsonData=self._stream.read(jsonLength) + jsonData = self._stream.read(jsonLength) assert len(jsonData)==jsonLength stats.logReceived(jsonData) - jsonData=json.loads(str(jsonData,encoding="utf-8")) + jsonData = json.loads(str(jsonData,encoding="utf-8")) - binData=self._stream.read(binLength) + binData = self._stream.read(binLength) assert len(binData)==binLength stats.logReceived(binData) - return (jsonData,binData) + return (jsonData, binData) class NetworkWriter: - def __init__(self,stream): - self._stream=stream + def __init__(self, stream): + self._stream = stream - def writeMsg(self,*args): - msg=self.prepMsg(*args) + def writeMsg(self, *args): + msg = self.prepMsg(*args) self._stream.write(msg) self._stream.flush() stats.logSent(msg) - def prepMsg(self,jsonData,binData=b""): - jsonData=bytes(json.dumps(jsonData,separators=(',',':'))+"\n",encoding="utf-8") - jsonLength=bytes("json-length: "+str(len(jsonData))+"\n",encoding="utf-8") - binLength=bytes("bin-length: "+str(len(binData))+"\n",encoding="utf-8") + def prepMsg(self, jsonData, binData=b""): + jsonData = bytes(json.dumps(jsonData,separators=(',',':'))+"\n", encoding="utf-8") + jsonLength = bytes("json-length: "+str(len(jsonData))+"\n", encoding="utf-8") + binLength = bytes("bin-length: "+str(len(binData))+"\n", encoding="utf-8") - return b"".join((jsonLength,binLength,jsonData,binData)) + return b"".join((jsonLength, binLength, jsonData, binData)) diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -11,13 +11,13 @@ from datafile import DataFile class Connection(BaseConnection): - def __init__(self,serverSocket,sslContext): + def __init__(self, serverSocket, sslContext): super().__init__() sock, address = serverSocket.accept() - peer=sock.getpeername() + peer = sock.getpeername() try: - self._socket=sslContext.wrap_socket(sock,server_side=True) + self._socket = sslContext.wrap_socket(sock, server_side=True) except (ssl.SSLError,OSError) as e: log.warning("Failed to establish an SSL connection from {0}.".format(peer)) raise e @@ -27,70 +27,70 @@ class Connection(BaseConnection): class Miniserver: - def __init__(self,filename,treeFile=""): - self._filename=filename - self._treeFile=treeFile + def __init__(self, filename, treeFile=""): + self._filename = filename + self._treeFile = treeFile - self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers) - self._ssl.verify_mode=ssl.CERT_REQUIRED - self._ssl.load_cert_chain(conf.certfile,conf.keyfile) + self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers) + self._ssl.verify_mode = ssl.CERT_REQUIRED + self._ssl.load_cert_chain(conf.certfile, conf.keyfile) self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._ss.bind(("", conf.port)) self._ss.listen(1) def serve(self): - p=None + p = None with self._ss: while True: - try: connection=Connection(self._ss,self._ssl) - except (ssl.SSLError,OSError): continue + try: connection = Connection(self._ss, self._ssl) + except (ssl.SSLError, OSError): continue if p and p.is_alive(): with connection as c: c[0].readMsg() - c[1].writeMsg({"command":"deny","status":status.locked}) + c[1].writeMsg({"command":"deny", "status":status.locked}) continue - p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile)) + p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile)) p.start() class Server(NetNode): - def __init__(self,connection,filename,treeFile=""): - super().__init__(filename,treeFile) - (self._incoming,self._outcoming)=connection + def __init__(self, connection, filename, treeFile=""): + super().__init__(filename, treeFile) + (self._incoming, self._outcoming) = connection - self.BLOCK_SIZE=self._tree.BLOCK_SIZE + self.BLOCK_SIZE = self._tree.BLOCK_SIZE - self._dataFileObj=None + self._dataFileObj = None @staticmethod - def run(connection,*args): + def run(connection, *args): with connection as c: - s=Server(c,*args) + s = Server(c,*args) s.serve() @property def _dataFile(self): if not self._dataFileObj: - self._dataFileObj=DataFile.open(self._filename, mode="rb+") + self._dataFileObj = DataFile.open(self._filename, mode="rb+") return self._dataFileObj def serve(self): try: while self._serveOne(): pass - except (AssertionError,ConnectionResetError) as e: + except (AssertionError, ConnectionResetError) as e: log.warning(e) def _serveOne(self): - jsonData,binData=self._incoming.readMsg() + jsonData, binData = self._incoming.readMsg() if jsonData["command"]=="init": if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount: - self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters}) + self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters}) if jsonData["version"]1024: - x/=1024 - i+=1 - if x>=100: x=round(x) - elif x>=10: x=round(x,1) - else: x=round(x,2) - return "{0} {1}".format(x,exts[i]) + x /= 1024 + i += 1 + if x>=100: x = round(x) + elif x>=10: x = round(x, 1) + else: x = round(x, 2) + return "{0} {1}".format(x, exts[i]) diff --git a/src/status.py b/src/status.py --- a/src/status.py +++ b/src/status.py @@ -1,5 +1,5 @@ class incompatible: - version=100 - parameters=101 + version = 100 + parameters = 101 -locked=102 +locked = 102 diff --git a/src/tests/__init__.py b/src/tests/__init__.py --- a/src/tests/__init__.py +++ b/src/tests/__init__.py @@ -2,14 +2,14 @@ import sys class RedirectedOutput(): - _stdout=None + _stdout = None @classmethod def setUpClass(cls): - cls._stdout=sys.stdout - sys.stdout=open("/tmp/morevna-stdout.log",mode="a") + cls._stdout = sys.stdout + sys.stdout = open("/tmp/morevna-stdout.log", mode="a") @classmethod def tearDownClass(cls): sys.stdout.close() - sys.stdout=cls._stdout + sys.stdout = cls._stdout diff --git a/src/tests/test_hashtree.py b/src/tests/test_hashtree.py --- a/src/tests/test_hashtree.py +++ b/src/tests/test_hashtree.py @@ -9,23 +9,23 @@ random.seed(17) def buildTree(leaves): - tree=HashTree(len(leaves)) + tree = HashTree(len(leaves)) for l in leaves: tree.insertLeaf(l) tree.buildTree() return tree -class TestMorevna(RedirectedOutput,TestCase): +class TestMorevna(RedirectedOutput, TestCase): def test_batchUpdate(self): - leaves=[b"a" for i in range(8)] - t1=buildTree(leaves) - keys=list(range(8)) + leaves = [b"a" for i in range(8)] + t1 = buildTree(leaves) + keys = list(range(8)) for i in range(8): random.shuffle(keys) for k in keys[:i+1]: - leaves[k]=bytes([random.randrange(256)]) - t2=buildTree(leaves) - t1.batchUpdate((k+t1.leafStart,leaves[k]) for k in keys[:i+1]) - self.assertEqual(t1.store,t2.store) + leaves[k] = bytes([random.randrange(256)]) + t2 = buildTree(leaves) + t1.batchUpdate((k+t1.leafStart, leaves[k]) for k in keys[:i+1]) + self.assertEqual(t1.store, t2.store) diff --git a/src/tests/test_overall.py b/src/tests/test_overall.py --- a/src/tests/test_overall.py +++ b/src/tests/test_overall.py @@ -13,31 +13,31 @@ from . import RedirectedOutput config.logger.removeHandler(config.handler) -handler=FileHandler("/tmp/morevna.log") +handler = FileHandler("/tmp/morevna.log") handler.setFormatter(config.formatter) config.logger.addHandler(handler) -config.batchSize.hash=8 -config.batchSize.data=8 +config.batchSize.hash = 8 +config.batchSize.data = 8 -dataDir=os.path.join(config.directory,"src/tests/data") -filename=os.path.join(dataDir,"test.img") +dataDir = os.path.join(config.directory, "src/tests/data") +filename = os.path.join(dataDir, "test.img") -def compareFiles(f1,f2): - with open(f1,mode="rb") as f: - h2=hashlib.sha256(f.read()).hexdigest() - with open(f2,mode="rb") as f: - h=hashlib.sha256(f.read()).hexdigest() - return (h,h2) +def compareFiles(f1, f2): + with open(f1, mode="rb") as f: + h2 = hashlib.sha256(f.read()).hexdigest() + with open(f2, mode="rb") as f: + h = hashlib.sha256(f.read()).hexdigest() + return (h, h2) -class TestMorevna(RedirectedOutput,TestCase): - _stdout=None +class TestMorevna(RedirectedOutput, TestCase): + _stdout = None def setUp(self): - src=os.path.join(dataDir,"test1.img") - shutil.copyfile(src,filename) + src = os.path.join(dataDir, "test1.img") + shutil.copyfile(src, filename) @classmethod def tearDownClass(cls): @@ -45,68 +45,68 @@ class TestMorevna(RedirectedOutput,TestC os.remove(filename) def test_build(self): - treeFile=os.path.join(dataDir,"test.bin") - refFile=os.path.join(dataDir,"test1.bin") + treeFile = os.path.join(dataDir, "test.bin") + refFile = os.path.join(dataDir, "test1.bin") - tree=HashTree.fromFile(os.path.join(dataDir,"test1.img")) + tree = HashTree.fromFile(os.path.join(dataDir, "test1.img")) tree.save(treeFile) - self.assertEqual(*compareFiles(refFile,treeFile)) + self.assertEqual(*compareFiles(refFile, treeFile)) os.remove(treeFile) def test_push(self): - config.port+=1 - ms=Miniserver(filename) - p=multiprocessing.Process(target=ms.serve) + config.port += 1 + ms = Miniserver(filename) + p = multiprocessing.Process(target=ms.serve) p.start() - for clientFile in ("test2.img","test3.img","test4.img"): - clientFile=os.path.join(dataDir,clientFile) - c=Client(clientFile) - with ClientConnection("127.0.0.1",config.port) as con: + for clientFile in ("test2.img", "test3.img", "test4.img"): + clientFile = os.path.join(dataDir, clientFile) + c = Client(clientFile) + with ClientConnection("127.0.0.1", config.port) as con: c.setConnection(con) c.init("push") - blocksToTransfer=c.negotiate() + blocksToTransfer = c.negotiate() c.sendData(blocksToTransfer) - self.assertEqual(*compareFiles(clientFile,filename)) + self.assertEqual(*compareFiles(clientFile, filename)) p.terminate() p.join() def test_pull(self): - config.port+=1 - serverFile=os.path.join(dataDir,"test3.img") - ms=Miniserver(serverFile) - p=multiprocessing.Process(target=ms.serve) + config.port += 1 + serverFile = os.path.join(dataDir, "test3.img") + ms = Miniserver(serverFile) + p = multiprocessing.Process(target=ms.serve) p.start() - c=Client(filename) - with ClientConnection("127.0.0.1",config.port) as con: + c = Client(filename) + with ClientConnection("127.0.0.1", config.port) as con: c.setConnection(con) c.init("pull") - blocksToTransfer=c.negotiate() + blocksToTransfer = c.negotiate() c.pullData(blocksToTransfer) - self.assertEqual(*compareFiles(serverFile,filename)) + self.assertEqual(*compareFiles(serverFile, filename)) p.terminate() p.join() def test_deny(self): - config.port+=1 - ms=Miniserver(filename) - p=multiprocessing.Process(target=ms.serve) + config.port += 1 + ms = Miniserver(filename) + p = multiprocessing.Process(target=ms.serve) p.start() - c1=Client(os.path.join(dataDir,"test2.img")) - with ClientConnection("127.0.0.1",config.port) as con1: + c1 = Client(os.path.join(dataDir, "test2.img")) + with ClientConnection("127.0.0.1", config.port) as con1: c1.setConnection(con1) c1.init("push") - c2=Client(os.path.join(dataDir,"test3.img")) - with ClientConnection("127.0.0.1",config.port) as con2: + c2 = Client(os.path.join(dataDir, "test3.img")) + with ClientConnection("127.0.0.1", config.port) as con2: c2.setConnection(con2) with self.assertRaises(DeniedConnection): c2.init("push") diff --git a/src/util.py b/src/util.py --- a/src/util.py +++ b/src/util.py @@ -27,7 +27,7 @@ def spawnDaemon(fun): print("[{0}] server running".format(pid)) sys.exit(0) except OSError as e: - print("fork #2 failed: {0} ({1})".format(e.errno,e.strerror),file=sys.stderr) + print("fork #2 failed: {0} ({1})".format(e.errno, e.strerror), file=sys.stderr) sys.exit(1) fun() @@ -36,73 +36,73 @@ def spawnDaemon(fun): os._exit(os.EX_OK) -def splitHost(host,defaultPort=0): - address,_,port=host.partition(":") - if not port: port=defaultPort - return (address,port) +def splitHost(host, defaultPort=0): + address, _, port = host.partition(":") + if not port: port = defaultPort + return (address, port) class Progress: - def __init__(self,n,i0=0): - self._n=n - self._i0=i0 - self._i=i0 - self._last="" - self._past=collections.deque() + def __init__(self, n, i0=0): + self._n = n + self._i0 = i0 + self._i = i0 + self._last = "" + self._past = collections.deque() - def p(self,i): - i0=self._i0 - n=self._n - now=datetime.now() + def p(self, i): + i0 = self._i0 + n = self._n + now = datetime.now() - assert i0<=i1: - self._past.append((now,i)) + self._past.append((now, i)) if res!=self._last or (now-self._past[0][0]).total_seconds()>5: - eta=formatSeconds(self.eta(i)) - self._print("{0} (ETA {1})".format(res,eta)) - self._last=res + eta = formatSeconds(self.eta(i)) + self._print("{0} (ETA {1})".format(res, eta)) + self._last = res while (now-self._past[0][0]).total_seconds()>5: self._past.popleft() def done(self): - self._print("100%",end="\n") + self._print("100%", end="\n") - def eta(self,i2): - t2=datetime.now() - (t1,i1)=self._past[0] + def eta(self, i2): + t2 = datetime.now() + (t1, i1) = self._past[0] if i2==i1: return float("nan") return (self._n-i2)/(i2-i1)*(t2-t1).total_seconds() @staticmethod - def _p(i,n,i0): + def _p(i, n, i0): _1=1 if n>=i0 else -1 return 100*(i+_1-i0)//(n-i0) - def _print(self,s,end=""): - print("\r"+" "*80,end="") - print("\r"+s,end=end) + def _print(self, s, end=""): + print("\r"+" "*80, end="") + print("\r"+s, end=end) def formatSeconds(secs): if math.isnan(secs): return "?" - secs=round(secs) + secs = round(secs) if secs<60: return "{0}s".format(secs) - mins=secs//60 - secs%=60 + mins = secs//60 + secs %= 60 if mins<60: return "{0:02}:{1:02}".format(mins, secs) - hours=mins//60 - mins%=60 + hours = mins//60 + mins %= 60 return "{0:02}:{1:02}:{2:02}".format(hours, mins, secs) if __name__=="__main__": import random import time - progress=Progress(100) - for i in range(1,100): + progress = Progress(100) + for i in range(1, 100): progress.p(i) time.sleep(random.random()*2) progress.done()