diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -10,6 +10,7 @@ import stats from util import Progress from hashtree import HashTree,hashBlock from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException +from datafile import DataFile class DeniedConnection(Exception): pass @@ -142,28 +143,27 @@ class Client(NetNode): print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).") return log.info(blocksToTransfer) - dataFile=open(self._filename, mode="rb+") - i1=-1 + dataFile=DataFile.open(self._filename, mode="rb+") print(datetime.now(), "receiving data:") progress=Progress(len(blocksToTransfer)) - for (k,i2) in enumerate(blocksToTransfer): - self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"}) - jsonData,binData=self._incoming.readMsg() - assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData - - if i1+1!=i2: - dataFile.seek(i2*HashTree.BLOCK_SIZE) - dataFile.write(binData) - if self._treeFile: - self._newLeaves[i2+self._tree.leafStart]=hashBlock(binData) + for k in range(0,len(blocksToTransfer),conf.batchSize): + indices=blocksToTransfer[k:k+conf.batchSize] + self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"}) + jsonData,binData=self._incoming.readMsg() + assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData + for (j,i) in enumerate(indices): + block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] + dataFile.writeAt(i,block) - log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:])) + if self._treeFile: + self._newLeaves[i+self._tree.leafStart]=hashBlock(block) - stats.logTransferredBlock() - i1=i2 - progress.p(k) + log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:])) + + stats.logTransferredBlock() + progress.p(k+j) progress.done() self._outcoming.writeMsg({"command":"end"}) diff --git a/src/datafile.py b/src/datafile.py new file mode 100644 --- /dev/null +++ b/src/datafile.py @@ -0,0 +1,29 @@ +from hashtree import HashTree + + +BLOCK_SIZE=HashTree.BLOCK_SIZE + + +class DataFile: + def __init__(self,fileHandle): + self._lastIndex=0 + self._f=fileHandle + + @staticmethod + def open(filename,mode="rb"): + return DataFile(open(filename,mode=mode)) + + def writeAt(self,i,blockData): + if i!=self._lastIndex+1: + self._f.seek(i*BLOCK_SIZE) + self._f.write(blockData) + self._lastIndex=i + + def readFrom(self,i,byteCount=BLOCK_SIZE): + if i!=self._lastIndex+1: + self._f.seek(i*BLOCK_SIZE) + self._lastIndex=i + return self._f.read(byteCount) + + def close(self): + self._f.close() diff --git a/src/morevna.py b/src/morevna.py --- a/src/morevna.py +++ b/src/morevna.py @@ -7,7 +7,7 @@ from util import spawnDaemon, splitHost import config as conf import stats from hashtree import HashTree -from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection +from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection, IncompatibleException from server import Miniserver @@ -53,6 +53,7 @@ def push(args): except DeniedConnection as e: print("Server {0}:{1} denied connection.".format(*host)) print(e) + except IncompatibleException as e: print(e) def pull(args): _checkFile(args.datafile) diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -123,11 +123,13 @@ class Server(NetNode): log.info("received request for data block #{0}".format(index)) jsonResponse={"command":"send", "index":index, "dataType":"data"} - if self._lastIndex+1!=index: - self._dataFile.seek(index*self.BLOCK_SIZE) - binResponse=self._dataFile.read(self.BLOCK_SIZE) + blocks=[] + for i in index: + if self._lastIndex+1!=i: + self._dataFile.seek(i*self.BLOCK_SIZE) + blocks.append(self._dataFile.read(self.BLOCK_SIZE)) - return (jsonResponse,binResponse) + return (jsonResponse,b"".join(blocks)) def _receiveData(self,jsonData,binData): if not self.isLocked(): self._lock()