Files @ b0515ceb502d
Branch filter:

Location: Morevna/src/server.py - annotation

Laman
actually flushing the files
83dc5e1e183e
83dc5e1e183e
cd2ba192bf12
6c8e994fd906
d0161c81635b
d0161c81635b
6a0ab4fe9f5e
870c5c6c334f
34f4027c1bd6
ccbe369ce439
a4bfaabaeabb
34f4027c1bd6
34f4027c1bd6
75e070b6b447
6a0ab4fe9f5e
75e070b6b447
75e070b6b447
6a0ab4fe9f5e
5c80ca07f00c
e3c116b7dc5c
6a0ab4fe9f5e
0e02ff79165c
e3c116b7dc5c
922291d72c0f
cd2ba192bf12
34f4027c1bd6
6a0ab4fe9f5e
34f4027c1bd6
34f4027c1bd6
6c8e994fd906
6a0ab4fe9f5e
5c80ca07f00c
6a0ab4fe9f5e
34f4027c1bd6
5c80ca07f00c
5c80ca07f00c
5c80ca07f00c
cd2ba192bf12
026618d6681b
026618d6681b
026618d6681b
34f4027c1bd6
6c8e994fd906
5c80ca07f00c
b022997ba96d
b022997ba96d
5c80ca07f00c
5c80ca07f00c
b022997ba96d
b022997ba96d
6a0ab4fe9f5e
6a0ab4fe9f5e
b022997ba96d
6a0ab4fe9f5e
6c8e994fd906
6c8e994fd906
6c8e994fd906
75e070b6b447
6a0ab4fe9f5e
6a0ab4fe9f5e
5c80ca07f00c
6c8e994fd906
5c80ca07f00c
6c8e994fd906
6a0ab4fe9f5e
83dc5e1e183e
3f9fff4c9811
6c8e994fd906
5c80ca07f00c
b022997ba96d
6a0ab4fe9f5e
b022997ba96d
6c8e994fd906
3f9fff4c9811
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
b73a5d69a11b
6c8e994fd906
6a0ab4fe9f5e
5c80ca07f00c
6c8e994fd906
34f4027c1bd6
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
b0515ceb502d
b0515ceb502d
b0515ceb502d
362cff560740
83dc5e1e183e
6a0ab4fe9f5e
b73a5d69a11b
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
44cf81f3b6b8
6a0ab4fe9f5e
dad65188b1a0
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
6a0ab4fe9f5e
d0161c81635b
6a0ab4fe9f5e
dad65188b1a0
b73a5d69a11b
dad65188b1a0
6a0ab4fe9f5e
34f4027c1bd6
dad65188b1a0
34f4027c1bd6
6a0ab4fe9f5e
e3c116b7dc5c
5813971dbecc
5c80ca07f00c
44cf81f3b6b8
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
6a0ab4fe9f5e
44cf81f3b6b8
6a0ab4fe9f5e
e3c116b7dc5c
44cf81f3b6b8
6a0ab4fe9f5e
5c80ca07f00c
bb3b53ee15d6
6a0ab4fe9f5e
34f4027c1bd6
6a0ab4fe9f5e
b73a5d69a11b
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
6a0ab4fe9f5e
5c80ca07f00c
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
34f4027c1bd6
83dc5e1e183e
870c5c6c334f
b0515ceb502d
870c5c6c334f
870c5c6c334f
870c5c6c334f
83dc5e1e183e
83dc5e1e183e
87a9ced6e7b5
d0161c81635b
d0161c81635b
d0161c81635b
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
6a0ab4fe9f5e
d0161c81635b
from datetime import datetime
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hash_block
from netnode import BaseConnection, NetNode
import config as conf
import status
from datafile import DataFile


class Connection(BaseConnection):
	def __init__(self, server_socket, ssl_context):
		super().__init__()

		sock, address = server_socket.accept()
		peer = sock.getpeername()
		try:
			self._socket = ssl_context.wrap_socket(sock, server_side=True)
		except (ssl.SSLError,OSError) as e:
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
			raise e

		log.info('Connected by {0}'.format(address))
		self.create_networkers()


class Miniserver:
	def __init__(self, filename, tree_file=""):
		self._filename = filename
		self._tree_file = tree_file

		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
		self._ssl.verify_mode = ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile, conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		p = None
		with self._ss:
			while True:
				try: connection = Connection(self._ss, self._ssl)
				except (ssl.SSLError, OSError): continue
				if p and p.is_alive():
					with connection as c:
						c[0].read_msg()
						c[1].write_msg({"command": "deny", "status":status.locked})
					continue
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
				p.start()


class Server(NetNode):
	def __init__(self, connection, filename, tree_file=""):
		super().__init__(filename, tree_file)
		(self._incoming, self._outcoming) = connection

		self.BLOCK_SIZE = self._tree.BLOCK_SIZE

		self._data_file_obj = None
		self._last_flushed = 0

	@staticmethod
	def run(connection, *args):
		with connection as c:
			s = Server(c, *args)
			s.serve()

	@property
	def _data_file(self):
		if not self._data_file_obj:
			self._data_file_obj = DataFile.open(self._filename, mode="rb+")
		return self._data_file_obj

	def serve(self):
		try:
			while self._serve_one(): pass
		except (AssertionError, ConnectionResetError) as e:
			log.warning(e)

	def _serve_one(self):
		json_data, bin_data = self._incoming.read_msg()

		if json_data["command"]=="init":
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
			if json_data["version"]<conf.lowest_compatible:
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
			if json_data["action"]=="pull" and self.is_locked():
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
			if json_data["action"]=="push" and not self.is_locked():
				try: self._lock()
				except Exception:
					pass

			self._last_flushed = datetime.now().timestamp()
			self._outcoming.write_msg({"command": "init", "version": conf.version})

		elif json_data["command"]=="req":
			if json_data["dataType"]=="data":
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
			else:
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))

		elif json_data["command"]=="send" and json_data["dataType"]=="data":
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))

		elif json_data["command"]=="end":
			self._finalize()
			if json_data.get("action")=="push": self._unlock()
			return False

		else:
			assert False, json_data["command"]

		return True

	def _request_hash(self, indices):
		log.info("received request for nodes {0}".format(indices))
		assert all(i<len(self._tree.store) for i in indices)
		hashes = [self._tree.store[i] for i in indices]

		json_response = {"command": "send", "index": indices, "dataType": "hash"}
		bin_response = b"".join(hashes)

		return (json_response, bin_response)

	def _request_data(self, index):
		log.info("received request for data blocks {0}".format(index))

		json_response = {"command": "send", "index": index, "dataType": "data"}
		blocks = []
		for i in index:
			blocks.append(self._data_file.read_from(i))

		return (json_response, b"".join(blocks))

	def _receive_data(self, json_data, bin_data):
		if not self.is_locked(): self._lock()
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))

		indices = json_data["index"]
		for (i, k) in enumerate(indices):
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
			self._data_file.write_at(k, block)
			if self._tree_file:
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)

		t = datetime.now().timestamp()
		if t-self._last_flushed>=60:
			self._data_file.flush()
			if self._tree_file:
				self._update_tree()
			self._refresh_lock()
			self._last_flushed = t

		return ({"command": "ack", "index": indices},)

	def _finalize(self):
		log.info("closing session...")
		self._data_file.close()
		self._data_file_obj = None
		if self._tree_file:
			self._update_tree()
		log.info("done")