# HG changeset patch # User Laman # Date 2018-01-20 00:29:18 # Node ID 87a9ced6e7b5965077373939e715a83e0a380333 # Parent ccbe369ce4396924166cd9101761b590cd0ca07e batch push diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -109,20 +109,24 @@ class Client(NetNode): print(datetime.now(), "sending data:") progress=Progress(len(blocksToTransfer)) - for (k,i2) in enumerate(blocksToTransfer): - jsonData={"command":"send", "index":i2, "dataType":"data"} - if i1+1!=i2: - dataFile.seek(i2*HashTree.BLOCK_SIZE) - binData=dataFile.read(HashTree.BLOCK_SIZE) - log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:])) + for k in range(0,len(blocksToTransfer),conf.batchSize): + indices=[] + blocks=[] + for j in range(conf.batchSize): + if k+j>=len(blocksToTransfer): break + i2=blocksToTransfer[k+j] + if i1+1!=i2: + dataFile.seek(i2*HashTree.BLOCK_SIZE) + block=dataFile.read(HashTree.BLOCK_SIZE) - self._outcoming.writeMsg(jsonData,binData) - stats.logTransferredBlock() - jsonData,binData=self._incoming.readMsg() - assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData - i1=i2 - progress.p(k) + indices.append(i2) + blocks.append(block) + log.info("block #{0}: {1}...{2}".format(i2,block[:5],block[-5:])) + + i1=i2 + progress.p(k+j) + if indices: self._sendData(indices,blocks) progress.done() self._outcoming.writeMsg({"command":"end","action":"push"}) @@ -171,5 +175,13 @@ class Client(NetNode): if self._treeFile: self._updateTree() + def _sendData(self,indices,blocks): + jsonData={"command":"send", "index":indices, "dataType":"data"} + binData=b"".join(blocks) + self._outcoming.writeMsg(jsonData,binData) + stats.logTransferredBlock(len(indices)) + jsonData,binData=self._incoming.readMsg() + assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData + def setConnection(self,connection): (self._incoming,self._outcoming)=connection diff --git a/src/config.py b/src/config.py --- a/src/config.py +++ b/src/config.py @@ -21,10 +21,10 @@ conf=dict() if os.path.isfile(configFile): with open(configFile) as f: conf=json.load(f) -version=[0,0,1] -lowestCompatible=[0,0,0] # tuple is more fitting but json conversion transforms it into a list anyway +version=[0,1,0] +lowestCompatible=[0,1,0] # tuple is more fitting but json conversion transforms it into a list anyway hosts=conf.get("hosts",["127.0.0.1"]) port=conf.get("port",9901) -batchSize=conf.get("batchSize",256) +batchSize=conf.get("batchSize",64) diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -133,15 +133,16 @@ class Server(NetNode): if not self.isLocked(): self._lock() log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:])) - i=jsonData["index"] - if self._lastIndex+1!=i: - self._dataFile.seek(i*self.BLOCK_SIZE) - self._dataFile.write(binData) - self._lastIndex=i - if self._treeFile: - self._newLeaves[i+self._tree.leafStart]=hashBlock(binData) + indices=jsonData["index"] + for (i,k) in enumerate(indices): + if self._lastIndex+1!=k: + self._dataFile.seek(k*self.BLOCK_SIZE) + self._dataFile.write(binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]) + self._lastIndex=k + if self._treeFile: + self._newLeaves[k+self._tree.leafStart]=hashBlock(binData) - return ({"command": "ack", "index": i},) + return ({"command": "ack", "index": indices},) def _finalize(self): log.info("closing session...") diff --git a/src/stats.py b/src/stats.py --- a/src/stats.py +++ b/src/stats.py @@ -20,8 +20,8 @@ def logExchangedNode(k=1): stats.exchangedNodes+=k -def logTransferredBlock(): - stats.transferredBlocks+=1 +def logTransferredBlock(k=1): + stats.transferredBlocks+=k def reset():