import socket import ssl import multiprocessing import logging as log from hashtree import hashBlock from netnode import BaseConnection,NetNode import config as conf import status from datafile import DataFile class Connection(BaseConnection): def __init__(self,serverSocket,sslContext): super().__init__() sock, address = serverSocket.accept() try: self._socket=sslContext.wrap_socket(sock,server_side=True) except ssl.SSLError as e: log.warning("Failed to establish an SSL connection from {0}.".format(sock.getpeername())) raise e log.info('Connected by {0}'.format(address)) self.createNetworkers() class Miniserver: def __init__(self,filename,treeFile=""): self._filename=filename self._treeFile=treeFile self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers) self._ssl.verify_mode=ssl.CERT_REQUIRED self._ssl.load_cert_chain(conf.certfile,conf.keyfile) self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._ss.bind(("", conf.port)) self._ss.listen(1) def serve(self): p=None with self._ss: while True: try: connection=Connection(self._ss,self._ssl) except ssl.SSLError: continue if p and p.is_alive(): with connection as c: c[0].readMsg() c[1].writeMsg({"command":"deny","status":status.locked}) continue p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile)) p.start() class Server(NetNode): def __init__(self,connection,filename,treeFile=""): super().__init__(filename,treeFile) (self._incoming,self._outcoming)=connection self.BLOCK_SIZE=self._tree.BLOCK_SIZE self._dataFileObj=None @staticmethod def run(connection,*args): with connection as c: s=Server(c,*args) s.serve() @property def _dataFile(self): if not self._dataFileObj: self._dataFileObj=DataFile.open(self._filename, mode="rb+") return self._dataFileObj def serve(self): try: while self._serveOne(): pass except (AssertionError,ConnectionResetError) as e: log.warning(e) def _serveOne(self): jsonData,binData=self._incoming.readMsg() if jsonData["command"]=="init": if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount: self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters}) if jsonData["version"]<conf.lowestCompatible: self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version}) if jsonData["action"]=="pull" and self.isLocked(): self._outcoming.writeMsg({"command":"deny","status":status.locked}) if jsonData["action"]=="push" and not self.isLocked(): self._lock() self._outcoming.writeMsg({"command":"init", "version":conf.version}) elif jsonData["command"]=="req": if jsonData["dataType"]=="data": self._outcoming.writeMsg(*self._requestData(jsonData["index"])) else: self._outcoming.writeMsg(*self._requestHash(jsonData["index"])) elif jsonData["command"]=="send" and jsonData["dataType"]=="data": self._outcoming.writeMsg(*self._receiveData(jsonData,binData)) elif jsonData["command"]=="end": self._finalize() if jsonData.get("action")=="push": self._unlock() return False else: assert False, jsonData["command"] return True def _requestHash(self,indices): log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices))) assert all(i<len(self._tree.store) for i in indices) hashes=[self._tree.store[i] for i in indices] jsonResponse={"command":"send", "index":indices, "dataType":"hash"} binResponse=b"".join(hashes) return (jsonResponse,binResponse) def _requestData(self,index): log.info("received request for data block #{0}".format(index)) jsonResponse={"command":"send", "index":index, "dataType":"data"} blocks=[] for i in index: blocks.append(self._dataFile.readFrom(i)) return (jsonResponse,b"".join(blocks)) def _receiveData(self,jsonData,binData): if not self.isLocked(): self._lock() log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:])) indices=jsonData["index"] for (i,k) in enumerate(indices): block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE] self._dataFile.writeAt(k,block) if self._treeFile: self._newLeaves[k+self._tree.leafStart]=hashBlock(block) return ({"command": "ack", "index": indices},) def _finalize(self): log.info("closing session...") self._dataFile.close() self._dataFileObj=None if self._treeFile: self._updateTree() log.info("done")