Changeset - 870c5c6c334f
[Not reviewed]
default
0 3 0
Laman - 5 years ago 2020-05-24 21:55:14

reacquiring old locks
3 files changed with 15 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -145,50 +145,52 @@ class Client(NetNode):
 
				print(
 
					"The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		last_flushed = datetime.now().timestamp()
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = blocks_to_transfer[k:k+conf.batch_size.data]
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"})
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data
 
			for (j, i) in enumerate(indices):
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
					t = datetime.now().timestamp()
 
					if t-last_flushed >= 60 and self._tree_file:
 
					if t-last_flushed >= 60:
 
						if self._tree_file:
 
						self._update_tree()
 
						self._refresh_lock()
 
						last_flushed = t
 

	
 
				stats.log_transferred_block()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.write_msg({"command": "end"})
 

	
 
		log.info("closing session...")
 
		data_file.close()
 
		self._unlock()
 

	
 
		if self._tree_file:
 
			self._update_tree()
 

	
 
	def _send_data(self, indices, blocks):
 
		json_data = {"command": "send", "index": indices, "dataType": "data"}
 
		bin_data = b"".join(blocks)
 
		self._outcoming.write_msg(json_data, bin_data)
 
		stats.log_transferred_block(len(indices))
 
		json_data, bin_data = self._incoming.read_msg()
 
		assert json_data["command"]=="ack" and json_data["index"]==indices, json_data
 

	
 
	def set_connection(self, connection):
src/netnode.py
Show inline comments
 
import os
 
from datetime import datetime
 
import socket
 
import logging as log
 

	
 
import config as conf
 
from networkers import NetworkReader, NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
lock_file = os.path.join(conf.directory, "dirty.lock")
 

	
 

	
 
class FailedConnection(Exception): pass
 
class LockedException(Exception): pass
 
class IncompatibleException(Exception): pass
 

	
 

	
 
class BaseConnection:  # abstract
 
	def __init__(self):
 
		self._socket = None
 
		self.incoming = None
 
		self.outcoming = None
 

	
 
	def create_networkers(self):
 
		fr = self._socket.makefile(mode="rb")
 
@@ -41,35 +42,43 @@ class BaseConnection:  # abstract
 

	
 
class NetNode:
 
	def __init__(self, filename, tree_file=""):
 
		self._incoming = None
 
		self._outcoming = None
 

	
 
		self._filename = filename
 
		self._tree_file = tree_file
 

	
 
		if tree_file:
 
			self._tree = HashTree.load(tree_file)
 
		else:
 
			self._tree = HashTree.from_file(filename)
 

	
 
		self._new_leaves = dict()
 

	
 
	def is_locked(self):
 
		return os.path.isfile(lock_file)
 

	
 
	def _lock(self):
 
		try:
 
			f = open(lock_file, "x")
 
			f.close()
 
		except FileExistsError:
 
			stat = os.stat(lock_file)
 
			dt = datetime.now().timestamp()-stat.st_mtime
 
			if dt<5*60:
 
			raise LockedException()
 
			log.warning("Found an old lock file ({0}s), ignoring it.".format(round(dt)))
 
			self._refresh_lock()
 

	
 
	def _refresh_lock(self):
 
		os.utime(lock_file)
 

	
 
	def _unlock(self):
 
		os.remove(lock_file)
 

	
 
	def _update_tree(self):
 
		log.info("updating hash tree...")
 
		self._tree.batch_update(self._new_leaves.items())
 
		self._tree.save(self._tree_file)
 
		self._new_leaves = dict()
 
		log.info("tree updated")
src/server.py
Show inline comments
 
@@ -129,37 +129,39 @@ class Server(NetNode):
 
		return (json_response, bin_response)
 

	
 
	def _request_data(self, index):
 
		log.info("received request for data blocks {0}".format(index))
 

	
 
		json_response = {"command": "send", "index": index, "dataType": "data"}
 
		blocks = []
 
		for i in index:
 
			blocks.append(self._data_file.read_from(i))
 

	
 
		return (json_response, b"".join(blocks))
 

	
 
	def _receive_data(self, json_data, bin_data):
 
		if not self.is_locked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
 

	
 
		indices = json_data["index"]
 
		for (i, k) in enumerate(indices):
 
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._data_file.write_at(k, block)
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		t = datetime.now().timestamp()
 
		if t-self._last_flushed>=60 and self._tree_file:
 
		if t-self._last_flushed>=60:
 
			if self._tree_file:
 
			self._update_tree()
 
			self._refresh_lock()
 
			self._last_flushed = t
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._data_file.close()
 
		self._data_file_obj = None
 
		if self._tree_file:
 
			self._update_tree()
 
		log.info("done")
0 comments (0 inline, 0 general)