diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -151,6 +151,7 @@ class Client(NetNode): print(datetime.now(), "receiving data:") progress = Progress(len(blocks_to_transfer)) + last_flushed = datetime.now().timestamp() for k in range(0, len(blocks_to_transfer), conf.batch_size.data): indices = blocks_to_transfer[k:k+conf.batch_size.data] self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"}) @@ -160,10 +161,14 @@ class Client(NetNode): block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE] data_file.write_at(i, block) + log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) if self._tree_file: self._new_leaves[i+self._tree.leaf_start] = hash_block(block) - log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:])) + t = datetime.now().timestamp() + if t-last_flushed >= 60 and self._tree_file: + self._update_tree() + last_flushed = t stats.log_transferred_block() progress.p(k+j) diff --git a/src/netnode.py b/src/netnode.py --- a/src/netnode.py +++ b/src/netnode.py @@ -71,4 +71,5 @@ class NetNode: log.info("updating hash tree...") self._tree.batch_update(self._new_leaves.items()) self._tree.save(self._tree_file) + self._new_leaves = dict() log.info("tree updated") diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -1,4 +1,5 @@ -import socket +from datetime import datetime +import socket import ssl import multiprocessing import logging as log @@ -62,6 +63,7 @@ class Server(NetNode): self.BLOCK_SIZE = self._tree.BLOCK_SIZE self._data_file_obj = None + self._last_flushed = 0 @staticmethod def run(connection, *args): @@ -94,6 +96,7 @@ class Server(NetNode): if json_data["action"]=="push" and not self.is_locked(): self._lock() + self._last_flushed = datetime.now().timestamp() self._outcoming.write_msg({"command": "init", "version": conf.version}) elif json_data["command"]=="req": @@ -146,6 +149,11 @@ class Server(NetNode): if self._tree_file: self._new_leaves[k+self._tree.leaf_start] = hash_block(block) + t = datetime.now().timestamp() + if t-self._last_flushed>=60 and self._tree_file: + self._update_tree() + self._last_flushed = t + return ({"command": "ack", "index": indices},) def _finalize(self):