Files
@ b0515ceb502d
Branch filter:
Location: Morevna/src/server.py - annotation
b0515ceb502d
5.0 KiB
text/x-python
actually flushing the files
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | 83dc5e1e183e 83dc5e1e183e cd2ba192bf12 6c8e994fd906 d0161c81635b d0161c81635b 6a0ab4fe9f5e 870c5c6c334f 34f4027c1bd6 ccbe369ce439 a4bfaabaeabb 34f4027c1bd6 34f4027c1bd6 75e070b6b447 6a0ab4fe9f5e 75e070b6b447 75e070b6b447 6a0ab4fe9f5e 5c80ca07f00c e3c116b7dc5c 6a0ab4fe9f5e 0e02ff79165c e3c116b7dc5c 922291d72c0f cd2ba192bf12 34f4027c1bd6 6a0ab4fe9f5e 34f4027c1bd6 34f4027c1bd6 6c8e994fd906 6a0ab4fe9f5e 5c80ca07f00c 6a0ab4fe9f5e 34f4027c1bd6 5c80ca07f00c 5c80ca07f00c 5c80ca07f00c cd2ba192bf12 026618d6681b 026618d6681b 026618d6681b 34f4027c1bd6 6c8e994fd906 5c80ca07f00c b022997ba96d b022997ba96d 5c80ca07f00c 5c80ca07f00c b022997ba96d b022997ba96d 6a0ab4fe9f5e 6a0ab4fe9f5e b022997ba96d 6a0ab4fe9f5e 6c8e994fd906 6c8e994fd906 6c8e994fd906 75e070b6b447 6a0ab4fe9f5e 6a0ab4fe9f5e 5c80ca07f00c 6c8e994fd906 5c80ca07f00c 6c8e994fd906 6a0ab4fe9f5e 83dc5e1e183e 3f9fff4c9811 6c8e994fd906 5c80ca07f00c b022997ba96d 6a0ab4fe9f5e b022997ba96d 6c8e994fd906 3f9fff4c9811 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 b73a5d69a11b 6c8e994fd906 6a0ab4fe9f5e 5c80ca07f00c 6c8e994fd906 34f4027c1bd6 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e b0515ceb502d b0515ceb502d b0515ceb502d 362cff560740 83dc5e1e183e 6a0ab4fe9f5e b73a5d69a11b 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 44cf81f3b6b8 6a0ab4fe9f5e dad65188b1a0 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 6a0ab4fe9f5e d0161c81635b 6a0ab4fe9f5e dad65188b1a0 b73a5d69a11b dad65188b1a0 6a0ab4fe9f5e 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 6a0ab4fe9f5e e3c116b7dc5c 5813971dbecc 5c80ca07f00c 44cf81f3b6b8 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 6a0ab4fe9f5e 44cf81f3b6b8 6a0ab4fe9f5e e3c116b7dc5c 44cf81f3b6b8 6a0ab4fe9f5e 5c80ca07f00c bb3b53ee15d6 6a0ab4fe9f5e 34f4027c1bd6 6a0ab4fe9f5e b73a5d69a11b 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 6a0ab4fe9f5e 5c80ca07f00c 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 34f4027c1bd6 83dc5e1e183e 870c5c6c334f b0515ceb502d 870c5c6c334f 870c5c6c334f 870c5c6c334f 83dc5e1e183e 83dc5e1e183e 87a9ced6e7b5 d0161c81635b d0161c81635b d0161c81635b 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e 6a0ab4fe9f5e d0161c81635b | from datetime import datetime
import socket
import ssl
import multiprocessing
import logging as log
from hashtree import hash_block
from netnode import BaseConnection, NetNode
import config as conf
import status
from datafile import DataFile
class Connection(BaseConnection):
def __init__(self, server_socket, ssl_context):
super().__init__()
sock, address = server_socket.accept()
peer = sock.getpeername()
try:
self._socket = ssl_context.wrap_socket(sock, server_side=True)
except (ssl.SSLError,OSError) as e:
log.warning("Failed to establish an SSL connection from {0}.".format(peer))
raise e
log.info('Connected by {0}'.format(address))
self.create_networkers()
class Miniserver:
def __init__(self, filename, tree_file=""):
self._filename = filename
self._tree_file = tree_file
self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
self._ssl.verify_mode = ssl.CERT_REQUIRED
self._ssl.load_cert_chain(conf.certfile, conf.keyfile)
self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ss.bind(("", conf.port))
self._ss.listen(1)
def serve(self):
p = None
with self._ss:
while True:
try: connection = Connection(self._ss, self._ssl)
except (ssl.SSLError, OSError): continue
if p and p.is_alive():
with connection as c:
c[0].read_msg()
c[1].write_msg({"command": "deny", "status":status.locked})
continue
p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
p.start()
class Server(NetNode):
def __init__(self, connection, filename, tree_file=""):
super().__init__(filename, tree_file)
(self._incoming, self._outcoming) = connection
self.BLOCK_SIZE = self._tree.BLOCK_SIZE
self._data_file_obj = None
self._last_flushed = 0
@staticmethod
def run(connection, *args):
with connection as c:
s = Server(c, *args)
s.serve()
@property
def _data_file(self):
if not self._data_file_obj:
self._data_file_obj = DataFile.open(self._filename, mode="rb+")
return self._data_file_obj
def serve(self):
try:
while self._serve_one(): pass
except (AssertionError, ConnectionResetError) as e:
log.warning(e)
def _serve_one(self):
json_data, bin_data = self._incoming.read_msg()
if json_data["command"]=="init":
if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
if json_data["version"]<conf.lowest_compatible:
self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
if json_data["action"]=="pull" and self.is_locked():
self._outcoming.write_msg({"command": "deny", "status": status.locked})
if json_data["action"]=="push" and not self.is_locked():
try: self._lock()
except Exception:
pass
self._last_flushed = datetime.now().timestamp()
self._outcoming.write_msg({"command": "init", "version": conf.version})
elif json_data["command"]=="req":
if json_data["dataType"]=="data":
self._outcoming.write_msg(*self._request_data(json_data["index"]))
else:
self._outcoming.write_msg(*self._request_hash(json_data["index"]))
elif json_data["command"]=="send" and json_data["dataType"]=="data":
self._outcoming.write_msg(*self._receive_data(json_data, bin_data))
elif json_data["command"]=="end":
self._finalize()
if json_data.get("action")=="push": self._unlock()
return False
else:
assert False, json_data["command"]
return True
def _request_hash(self, indices):
log.info("received request for nodes {0}".format(indices))
assert all(i<len(self._tree.store) for i in indices)
hashes = [self._tree.store[i] for i in indices]
json_response = {"command": "send", "index": indices, "dataType": "hash"}
bin_response = b"".join(hashes)
return (json_response, bin_response)
def _request_data(self, index):
log.info("received request for data blocks {0}".format(index))
json_response = {"command": "send", "index": index, "dataType": "data"}
blocks = []
for i in index:
blocks.append(self._data_file.read_from(i))
return (json_response, b"".join(blocks))
def _receive_data(self, json_data, bin_data):
if not self.is_locked(): self._lock()
log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
indices = json_data["index"]
for (i, k) in enumerate(indices):
block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
self._data_file.write_at(k, block)
if self._tree_file:
self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
t = datetime.now().timestamp()
if t-self._last_flushed>=60:
self._data_file.flush()
if self._tree_file:
self._update_tree()
self._refresh_lock()
self._last_flushed = t
return ({"command": "ack", "index": indices},)
def _finalize(self):
log.info("closing session...")
self._data_file.close()
self._data_file_obj = None
if self._tree_file:
self._update_tree()
log.info("done")
|