Files
@ 5c80ca07f00c
Branch filter:
Location: Morevna/src/server.py
5c80ca07f00c
4.6 KiB
text/x-python
reformatted whitespace with more respect for PEP-8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | import socket
import ssl
import multiprocessing
import logging as log
from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
import status
from datafile import DataFile
class Connection(BaseConnection):
def __init__(self, serverSocket, sslContext):
super().__init__()
sock, address = serverSocket.accept()
peer = sock.getpeername()
try:
self._socket = sslContext.wrap_socket(sock, server_side=True)
except (ssl.SSLError,OSError) as e:
log.warning("Failed to establish an SSL connection from {0}.".format(peer))
raise e
log.info('Connected by {0}'.format(address))
self.createNetworkers()
class Miniserver:
def __init__(self, filename, treeFile=""):
self._filename = filename
self._treeFile = treeFile
self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
self._ssl.verify_mode = ssl.CERT_REQUIRED
self._ssl.load_cert_chain(conf.certfile, conf.keyfile)
self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ss.bind(("", conf.port))
self._ss.listen(1)
def serve(self):
p = None
with self._ss:
while True:
try: connection = Connection(self._ss, self._ssl)
except (ssl.SSLError, OSError): continue
if p and p.is_alive():
with connection as c:
c[0].readMsg()
c[1].writeMsg({"command":"deny", "status":status.locked})
continue
p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile))
p.start()
class Server(NetNode):
def __init__(self, connection, filename, treeFile=""):
super().__init__(filename, treeFile)
(self._incoming, self._outcoming) = connection
self.BLOCK_SIZE = self._tree.BLOCK_SIZE
self._dataFileObj = None
@staticmethod
def run(connection, *args):
with connection as c:
s = Server(c,*args)
s.serve()
@property
def _dataFile(self):
if not self._dataFileObj:
self._dataFileObj = DataFile.open(self._filename, mode="rb+")
return self._dataFileObj
def serve(self):
try:
while self._serveOne(): pass
except (AssertionError, ConnectionResetError) as e:
log.warning(e)
def _serveOne(self):
jsonData, binData = self._incoming.readMsg()
if jsonData["command"]=="init":
if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters})
if jsonData["version"]<conf.lowestCompatible:
self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version})
if jsonData["action"]=="pull" and self.isLocked():
self._outcoming.writeMsg({"command":"deny", "status":status.locked})
if jsonData["action"]=="push" and not self.isLocked():
self._lock()
self._outcoming.writeMsg({"command":"init", "version":conf.version})
elif jsonData["command"]=="req":
if jsonData["dataType"]=="data":
self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
else:
self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
self._outcoming.writeMsg(*self._receiveData(jsonData, binData))
elif jsonData["command"]=="end":
self._finalize()
if jsonData.get("action")=="push": self._unlock()
return False
else:
assert False, jsonData["command"]
return True
def _requestHash(self, indices):
log.info("received request for nodes {0}".format(indices))
assert all(i<len(self._tree.store) for i in indices)
hashes = [self._tree.store[i] for i in indices]
jsonResponse = {"command":"send", "index":indices, "dataType":"hash"}
binResponse = b"".join(hashes)
return (jsonResponse, binResponse)
def _requestData(self, index):
log.info("received request for data blocks {0}".format(index))
jsonResponse = {"command":"send", "index":index, "dataType":"data"}
blocks = []
for i in index:
blocks.append(self._dataFile.readFrom(i))
return (jsonResponse, b"".join(blocks))
def _receiveData(self, jsonData, binData):
if not self.isLocked(): self._lock()
log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:]))
indices = jsonData["index"]
for (i, k) in enumerate(indices):
block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
self._dataFile.writeAt(k, block)
if self._treeFile:
self._newLeaves[k+self._tree.leafStart] = hashBlock(block)
return ({"command": "ack", "index": indices},)
def _finalize(self):
log.info("closing session...")
self._dataFile.close()
self._dataFileObj = None
if self._treeFile:
self._updateTree()
log.info("done")
|