Changeset - b0515ceb502d
[Not reviewed]
tip default
1 4 0
Laman - 5 years ago 2020-06-07 13:28:55

actually flushing the files
5 files changed with 9 insertions and 43 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -74,124 +74,125 @@ class Client(NetNode):
 
		local_tree = self._tree
 
		blocks_to_transfer = []
 
		node_stack = collections.deque([0])  # root
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress = Progress(local_tree.leaf_count)
 
		while len(node_stack) > 0:
 
			indices = []
 
			for i in range(conf.batch_size.hash):
 
				indices.append(node_stack.pop())
 
				if len(node_stack) == 0: break
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "hash"})
 

	
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["index"] == indices
 
			assert json_data["dataType"] == "hash"
 
			stats.log_exchanged_node(len(indices))
 

	
 
			frontier = []
 
			for (j, i) in enumerate(indices):
 
				(j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)]
 
				if local_tree.store[i] != bin_data[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2 < len(local_tree.store):  # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocks_to_transfer.append(i-local_tree.leaf_start)  # leaf
 
						progress.p(i-local_tree.leaf_start)
 
			node_stack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size = stats.format_bytes(len(blocks_to_transfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocks_to_transfer
 

	
 
	def send_data(self, blocks_to_transfer):
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename)
 

	
 
		print(datetime.now(), "sending data:")
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = []
 
			blocks = []
 
			for j in range(conf.batch_size.data):
 
				if k+j >= len(blocks_to_transfer): break
 
				i = blocks_to_transfer[k+j]
 
				block = data_file.read_from(i)
 

	
 
				indices.append(i)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 

	
 
				progress.p(k+j)
 
			if indices: self._send_data(indices, blocks)
 
		progress.done()
 

	
 
		self._outcoming.write_msg({"command": "end", "action": "push"})
 

	
 
		log.info("closing session...")
 
		data_file.close()
 

	
 
	def pull_data(self, blocks_to_transfer, ignore_lock=False):
 
		if not ignore_lock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print(
 
					"The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		last_flushed = datetime.now().timestamp()
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = blocks_to_transfer[k:k+conf.batch_size.data]
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"})
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data
 
			for (j, i) in enumerate(indices):
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
					t = datetime.now().timestamp()
 
					if t-last_flushed >= 60:
 
						data_file.flush()
 
						if self._tree_file:
 
							self._update_tree()
 
						self._refresh_lock()
 
						last_flushed = t
 

	
 
				stats.log_transferred_block()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.write_msg({"command": "end"})
 

	
 
		log.info("closing session...")
 
		data_file.close()
 
		self._unlock()
 

	
 
		if self._tree_file:
 
			self._update_tree()
 

	
 
	def _send_data(self, indices, blocks):
 
		json_data = {"command": "send", "index": indices, "dataType": "data"}
 
		bin_data = b"".join(blocks)
 
		self._outcoming.write_msg(json_data, bin_data)
 
		stats.log_transferred_block(len(indices))
 
		json_data, bin_data = self._incoming.read_msg()
 
		assert json_data["command"]=="ack" and json_data["index"]==indices, json_data
 

	
 
	def set_connection(self, connection):
 
		(self._incoming, self._outcoming) = connection
src/config.py
Show inline comments
 
import os
 
import json
 
import logging as log
 
from logging.handlers import TimedRotatingFileHandler
 

	
 

	
 
directory = os.path.join(os.path.dirname(__file__), "..")
 
config_file = os.path.join(directory, "config.json")
 
conf = dict()
 
if os.path.isfile(config_file):
 
	with open(config_file) as f: conf = json.load(f)
 

	
 
log_file = conf.get("logFile", "/var/log/morevna/mor.log")
 

	
 
logger = log.getLogger()
 
logger.setLevel(log.INFO)
 
formatter = log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler = TimedRotatingFileHandler(log_file, when="midnight", backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
certfile = os.path.join(directory,"certs/cert.pem")
 
keyfile = os.path.join(directory,"certs/key.pem")
 
peers = os.path.join(directory,"certs/peers.pem")
 

	
 
version = [0, 1, 1]
 
lowest_compatible = [0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts = conf.get("hosts", ["127.0.0.1"])
 
port = conf.get("port", 9901)
 

	
 
bSize = conf.get("batch_size", dict())
 
class batch_size:
 
	hash = bSize.get("hash", 256)
 
	hash = bSize.get("hash", 16384)
 
	data = bSize.get("data", 64)
src/datafile.py
Show inline comments
 
from hashtree import HashTree
 

	
 

	
 
BLOCK_SIZE = HashTree.BLOCK_SIZE
 

	
 

	
 
class DataFile:
 
	def __init__(self, file_handle):
 
		self._last_index = 0
 
		self._f = file_handle
 

	
 
	@staticmethod
 
	def open(filename, mode="rb"):
 
		return DataFile(open(filename, mode=mode))
 

	
 
	def write_at(self, i, block_data):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._f.write(block_data)
 
		self._last_index = i
 

	
 
	def read_from(self, i, byte_count=BLOCK_SIZE):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._last_index = i
 
		return self._f.read(byte_count)
 

	
 
	def flush(self):
 
		self._f.flush()
 

	
 
	def close(self):
 
		self._f.close()
src/morevna.sh
Show inline comments
 
deleted file
src/server.py
Show inline comments
 
from datetime import datetime
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hash_block
 
from netnode import BaseConnection, NetNode
 
import config as conf
 
import status
 
from datafile import DataFile
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self, server_socket, ssl_context):
 
		super().__init__()
 

	
 
		sock, address = server_socket.accept()
 
		peer = sock.getpeername()
 
		try:
 
			self._socket = ssl_context.wrap_socket(sock, server_side=True)
 
		except (ssl.SSLError,OSError) as e:
 
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
 
			raise e
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.create_networkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self, filename, tree_file=""):
 
		self._filename = filename
 
		self._tree_file = tree_file
 

	
 
		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
 
		self._ssl.verify_mode = ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile, conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		p = None
 
		with self._ss:
 
			while True:
 
				try: connection = Connection(self._ss, self._ssl)
 
				except (ssl.SSLError, OSError): continue
 
				if p and p.is_alive():
 
					with connection as c:
 
						c[0].read_msg()
 
						c[1].write_msg({"command": "deny", "status":status.locked})
 
					continue
 
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self, connection, filename, tree_file=""):
 
		super().__init__(filename, tree_file)
 
		(self._incoming, self._outcoming) = connection
 

	
 
		self.BLOCK_SIZE = self._tree.BLOCK_SIZE
 

	
 
		self._data_file_obj = None
 
		self._last_flushed = 0
 

	
 
	@staticmethod
 
	def run(connection, *args):
 
		with connection as c:
 
			s = Server(c, *args)
 
			s.serve()
 

	
 
	@property
 
	def _data_file(self):
 
		if not self._data_file_obj:
 
			self._data_file_obj = DataFile.open(self._filename, mode="rb+")
 
		return self._data_file_obj
 

	
 
	def serve(self):
 
		try:
 
			while self._serve_one(): pass
 
		except (AssertionError, ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serve_one(self):
 
		json_data, bin_data = self._incoming.read_msg()
 

	
 
		if json_data["command"]=="init":
 
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
 
			if json_data["version"]<conf.lowest_compatible:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
 
			if json_data["action"]=="pull" and self.is_locked():
 
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
 
			if json_data["action"]=="push" and not self.is_locked():
 
				self._lock()
 
				try: self._lock()
 
				except Exception:
 
					pass
 

	
 
			self._last_flushed = datetime.now().timestamp()
 
			self._outcoming.write_msg({"command": "init", "version": conf.version})
 

	
 
		elif json_data["command"]=="req":
 
			if json_data["dataType"]=="data":
 
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
 
			else:
 
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))
 

	
 
		elif json_data["command"]=="send" and json_data["dataType"]=="data":
 
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))
 

	
 
		elif json_data["command"]=="end":
 
			self._finalize()
 
			if json_data.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, json_data["command"]
 

	
 
		return True
 

	
 
	def _request_hash(self, indices):
 
		log.info("received request for nodes {0}".format(indices))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes = [self._tree.store[i] for i in indices]
 

	
 
		json_response = {"command": "send", "index": indices, "dataType": "hash"}
 
		bin_response = b"".join(hashes)
 

	
 
		return (json_response, bin_response)
 

	
 
	def _request_data(self, index):
 
		log.info("received request for data blocks {0}".format(index))
 

	
 
		json_response = {"command": "send", "index": index, "dataType": "data"}
 
		blocks = []
 
		for i in index:
 
			blocks.append(self._data_file.read_from(i))
 

	
 
		return (json_response, b"".join(blocks))
 

	
 
	def _receive_data(self, json_data, bin_data):
 
		if not self.is_locked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
 

	
 
		indices = json_data["index"]
 
		for (i, k) in enumerate(indices):
 
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._data_file.write_at(k, block)
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		t = datetime.now().timestamp()
 
		if t-self._last_flushed>=60:
 
			self._data_file.flush()
 
			if self._tree_file:
 
				self._update_tree()
 
			self._refresh_lock()
 
			self._last_flushed = t
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._data_file.close()
 
		self._data_file_obj = None
 
		if self._tree_file:
 
			self._update_tree()
 
		log.info("done")
0 comments (0 inline, 0 general)