Files @ b0515ceb502d
Branch filter:

Location: Morevna/src/server.py

Laman
actually flushing the files
from datetime import datetime
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hash_block
from netnode import BaseConnection, NetNode
import config as conf
import status
from datafile import DataFile


class Connection(BaseConnection):
	def __init__(self, server_socket, ssl_context):
		super().__init__()

		sock, address = server_socket.accept()
		peer = sock.getpeername()
		try:
			self._socket = ssl_context.wrap_socket(sock, server_side=True)
		except (ssl.SSLError,OSError) as e:
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
			raise e

		log.info('Connected by {0}'.format(address))
		self.create_networkers()


class Miniserver:
	def __init__(self, filename, tree_file=""):
		self._filename = filename
		self._tree_file = tree_file

		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
		self._ssl.verify_mode = ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile, conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		p = None
		with self._ss:
			while True:
				try: connection = Connection(self._ss, self._ssl)
				except (ssl.SSLError, OSError): continue
				if p and p.is_alive():
					with connection as c:
						c[0].read_msg()
						c[1].write_msg({"command": "deny", "status":status.locked})
					continue
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
				p.start()


class Server(NetNode):
	def __init__(self, connection, filename, tree_file=""):
		super().__init__(filename, tree_file)
		(self._incoming, self._outcoming) = connection

		self.BLOCK_SIZE = self._tree.BLOCK_SIZE

		self._data_file_obj = None
		self._last_flushed = 0

	@staticmethod
	def run(connection, *args):
		with connection as c:
			s = Server(c, *args)
			s.serve()

	@property
	def _data_file(self):
		if not self._data_file_obj:
			self._data_file_obj = DataFile.open(self._filename, mode="rb+")
		return self._data_file_obj

	def serve(self):
		try:
			while self._serve_one(): pass
		except (AssertionError, ConnectionResetError) as e:
			log.warning(e)

	def _serve_one(self):
		json_data, bin_data = self._incoming.read_msg()

		if json_data["command"]=="init":
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
			if json_data["version"]<conf.lowest_compatible:
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
			if json_data["action"]=="pull" and self.is_locked():
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
			if json_data["action"]=="push" and not self.is_locked():
				try: self._lock()
				except Exception:
					pass

			self._last_flushed = datetime.now().timestamp()
			self._outcoming.write_msg({"command": "init", "version": conf.version})

		elif json_data["command"]=="req":
			if json_data["dataType"]=="data":
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
			else:
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))

		elif json_data["command"]=="send" and json_data["dataType"]=="data":
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))

		elif json_data["command"]=="end":
			self._finalize()
			if json_data.get("action")=="push": self._unlock()
			return False

		else:
			assert False, json_data["command"]

		return True

	def _request_hash(self, indices):
		log.info("received request for nodes {0}".format(indices))
		assert all(i<len(self._tree.store) for i in indices)
		hashes = [self._tree.store[i] for i in indices]

		json_response = {"command": "send", "index": indices, "dataType": "hash"}
		bin_response = b"".join(hashes)

		return (json_response, bin_response)

	def _request_data(self, index):
		log.info("received request for data blocks {0}".format(index))

		json_response = {"command": "send", "index": index, "dataType": "data"}
		blocks = []
		for i in index:
			blocks.append(self._data_file.read_from(i))

		return (json_response, b"".join(blocks))

	def _receive_data(self, json_data, bin_data):
		if not self.is_locked(): self._lock()
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))

		indices = json_data["index"]
		for (i, k) in enumerate(indices):
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
			self._data_file.write_at(k, block)
			if self._tree_file:
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)

		t = datetime.now().timestamp()
		if t-self._last_flushed>=60:
			self._data_file.flush()
			if self._tree_file:
				self._update_tree()
			self._refresh_lock()
			self._last_flushed = t

		return ({"command": "ack", "index": indices},)

	def _finalize(self):
		log.info("closing session...")
		self._data_file.close()
		self._data_file_obj = None
		if self._tree_file:
			self._update_tree()
		log.info("done")