Changeset - 83dc5e1e183e
[Not reviewed]
default
0 3 0
Laman - 5 years ago 2020-05-24 21:23:39

regularly flushing the hash tree to the disk during a data transfer
3 files changed with 16 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -151,6 +151,7 @@ class Client(NetNode):
 
		print(datetime.now(), "receiving data:")
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		last_flushed = datetime.now().timestamp()
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = blocks_to_transfer[k:k+conf.batch_size.data]
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"})
 
@@ -160,10 +161,14 @@ class Client(NetNode):
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 
					t = datetime.now().timestamp()
 
					if t-last_flushed >= 60 and self._tree_file:
 
						self._update_tree()
 
						last_flushed = t
 

	
 
				stats.log_transferred_block()
 
				progress.p(k+j)
src/netnode.py
Show inline comments
 
@@ -71,4 +71,5 @@ class NetNode:
 
		log.info("updating hash tree...")
 
		self._tree.batch_update(self._new_leaves.items())
 
		self._tree.save(self._tree_file)
 
		self._new_leaves = dict()
 
		log.info("tree updated")
src/server.py
Show inline comments
 
import socket
 
from datetime import datetime
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 
@@ -62,6 +63,7 @@ class Server(NetNode):
 
		self.BLOCK_SIZE = self._tree.BLOCK_SIZE
 

	
 
		self._data_file_obj = None
 
		self._last_flushed = 0
 

	
 
	@staticmethod
 
	def run(connection, *args):
 
@@ -94,6 +96,7 @@ class Server(NetNode):
 
			if json_data["action"]=="push" and not self.is_locked():
 
				self._lock()
 

	
 
			self._last_flushed = datetime.now().timestamp()
 
			self._outcoming.write_msg({"command": "init", "version": conf.version})
 

	
 
		elif json_data["command"]=="req":
 
@@ -146,6 +149,11 @@ class Server(NetNode):
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		t = datetime.now().timestamp()
 
		if t-self._last_flushed>=60 and self._tree_file:
 
			self._update_tree()
 
			self._last_flushed = t
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
0 comments (0 inline, 0 general)