diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -10,6 +10,7 @@ import stats from util import Progress from hashtree import HashTree,hashBlock from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException +from datafile import DataFile class DeniedConnection(Exception): pass @@ -142,28 +143,27 @@ class Client(NetNode): print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") return log.info(blocksToTransfer) - dataFile=open(self._filename, mode="rb+") - i1=-1 + dataFile=DataFile.open(self._filename, mode="rb+") print(datetime.now(), "receiving data:") progress=Progress(len(blocksToTransfer)) - for (k,i2) in enumerate(blocksToTransfer): - self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"}) - jsonData,binData=self._incoming.readMsg() - assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData - - if i1+1!=i2: - dataFile.seek(i2*HashTree.BLOCK_SIZE) - dataFile.write(binData) - if self._treeFile: - self._newLeaves[i2+self._tree.leafStart]=hashBlock(binData) + for k in range(0,len(blocksToTransfer),conf.batchSize): + indices=blocksToTransfer[k:k+conf.batchSize] + self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"}) + jsonData,binData=self._incoming.readMsg() + assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData + for (j,i) in enumerate(indices): + block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] + dataFile.writeAt(i,block) - log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:])) + if self._treeFile: + self._newLeaves[i+self._tree.leafStart]=hashBlock(block) - stats.logTransferredBlock() - i1=i2 - progress.p(k) + log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + + stats.logTransferredBlock() + progress.p(k+j) progress.done() self._outcoming.writeMsg({"command":"end"})