Changeset - b0515ceb502d
[Not reviewed]
tip default
1 4 0
Laman - 5 years ago 2020-06-07 13:28:55

actually flushing the files
5 files changed with 9 insertions and 43 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -158,24 +158,25 @@ class Client(NetNode):
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data
 
			for (j, i) in enumerate(indices):
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
					t = datetime.now().timestamp()
 
					if t-last_flushed >= 60:
 
						data_file.flush()
 
						if self._tree_file:
 
							self._update_tree()
 
						self._refresh_lock()
 
						last_flushed = t
 

	
 
				stats.log_transferred_block()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.write_msg({"command": "end"})
 

	
 
		log.info("closing session...")
src/config.py
Show inline comments
 
@@ -22,14 +22,14 @@ logger.addHandler(handler)
 
certfile = os.path.join(directory,"certs/cert.pem")
 
keyfile = os.path.join(directory,"certs/key.pem")
 
peers = os.path.join(directory,"certs/peers.pem")
 

	
 
version = [0, 1, 1]
 
lowest_compatible = [0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts = conf.get("hosts", ["127.0.0.1"])
 
port = conf.get("port", 9901)
 

	
 
bSize = conf.get("batch_size", dict())
 
class batch_size:
 
	hash = bSize.get("hash", 256)
 
	hash = bSize.get("hash", 16384)
 
	data = bSize.get("data", 64)
src/datafile.py
Show inline comments
 
@@ -16,14 +16,17 @@ class DataFile:
 
	def write_at(self, i, block_data):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._f.write(block_data)
 
		self._last_index = i
 

	
 
	def read_from(self, i, byte_count=BLOCK_SIZE):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._last_index = i
 
		return self._f.read(byte_count)
 

	
 
	def flush(self):
 
		self._f.flush()
 

	
 
	def close(self):
 
		self._f.close()
src/morevna.sh
Show inline comments
 
deleted file
src/server.py
Show inline comments
 
@@ -85,25 +85,27 @@ class Server(NetNode):
 

	
 
	def _serve_one(self):
 
		json_data, bin_data = self._incoming.read_msg()
 

	
 
		if json_data["command"]=="init":
 
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
 
			if json_data["version"]<conf.lowest_compatible:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
 
			if json_data["action"]=="pull" and self.is_locked():
 
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
 
			if json_data["action"]=="push" and not self.is_locked():
 
				self._lock()
 
				try: self._lock()
 
				except Exception:
 
					pass
 

	
 
			self._last_flushed = datetime.now().timestamp()
 
			self._outcoming.write_msg({"command": "init", "version": conf.version})
 

	
 
		elif json_data["command"]=="req":
 
			if json_data["dataType"]=="data":
 
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
 
			else:
 
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))
 

	
 
		elif json_data["command"]=="send" and json_data["dataType"]=="data":
 
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))
 
@@ -142,24 +144,25 @@ class Server(NetNode):
 
		if not self.is_locked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
 

	
 
		indices = json_data["index"]
 
		for (i, k) in enumerate(indices):
 
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._data_file.write_at(k, block)
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		t = datetime.now().timestamp()
 
		if t-self._last_flushed>=60:
 
			self._data_file.flush()
 
			if self._tree_file:
 
				self._update_tree()
 
			self._refresh_lock()
 
			self._last_flushed = t
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._data_file.close()
 
		self._data_file_obj = None
 
		if self._tree_file:
0 comments (0 inline, 0 general)