Files
@ 13603ac261f1
Branch filter:
Location: Morevna/src/server.py - annotation
13603ac261f1
4.5 KiB
text/x-python
minor enhancements
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | 8bb6a904d50b cd2ba192bf12 6c8e994fd906 d0161c81635b d0161c81635b 75e070b6b447 75e070b6b447 34f4027c1bd6 ccbe369ce439 a4bfaabaeabb 34f4027c1bd6 34f4027c1bd6 75e070b6b447 cd2ba192bf12 75e070b6b447 75e070b6b447 cd2ba192bf12 922291d72c0f 922291d72c0f 922291d72c0f 922291d72c0f cd2ba192bf12 34f4027c1bd6 75e070b6b447 34f4027c1bd6 34f4027c1bd6 6c8e994fd906 8b0dc65400f3 026618d6681b d0161c81635b 34f4027c1bd6 cd2ba192bf12 cd2ba192bf12 cd2ba192bf12 cd2ba192bf12 026618d6681b 026618d6681b 026618d6681b 34f4027c1bd6 6c8e994fd906 b022997ba96d b022997ba96d b022997ba96d 922291d72c0f 922291d72c0f b022997ba96d b022997ba96d b022997ba96d ccbe369ce439 b022997ba96d b022997ba96d 6c8e994fd906 6c8e994fd906 6c8e994fd906 75e070b6b447 6c8e994fd906 75e070b6b447 6c8e994fd906 6c8e994fd906 6c8e994fd906 6c8e994fd906 a4bfaabaeabb 3f9fff4c9811 6c8e994fd906 b022997ba96d b022997ba96d b022997ba96d b022997ba96d 6c8e994fd906 3f9fff4c9811 3f9fff4c9811 a4bfaabaeabb a4bfaabaeabb a4bfaabaeabb 34f4027c1bd6 b73a5d69a11b 6c8e994fd906 6c8e994fd906 9f2b0a4f3538 6c8e994fd906 34f4027c1bd6 6c8e994fd906 6c8e994fd906 34f4027c1bd6 dad65188b1a0 362cff560740 ccbe369ce439 ccbe369ce439 ccbe369ce439 362cff560740 ccbe369ce439 b022997ba96d b022997ba96d 362cff560740 362cff560740 b73a5d69a11b dad65188b1a0 44cf81f3b6b8 6c8e994fd906 44cf81f3b6b8 6c8e994fd906 dad65188b1a0 dad65188b1a0 6c8e994fd906 34f4027c1bd6 dad65188b1a0 d0161c81635b 362cff560740 dad65188b1a0 b73a5d69a11b dad65188b1a0 dad65188b1a0 34f4027c1bd6 dad65188b1a0 34f4027c1bd6 5813971dbecc 5813971dbecc 5813971dbecc 5813971dbecc 44cf81f3b6b8 5813971dbecc 5813971dbecc 34f4027c1bd6 44cf81f3b6b8 44cf81f3b6b8 44cf81f3b6b8 44cf81f3b6b8 44cf81f3b6b8 44cf81f3b6b8 bb3b53ee15d6 bb3b53ee15d6 a4bfaabaeabb 34f4027c1bd6 bb3b53ee15d6 b73a5d69a11b b73a5d69a11b 362cff560740 b73a5d69a11b 34f4027c1bd6 87a9ced6e7b5 87a9ced6e7b5 4f3ff4c311f2 a4bfaabaeabb 87a9ced6e7b5 4f3ff4c311f2 34f4027c1bd6 87a9ced6e7b5 d0161c81635b d0161c81635b d0161c81635b 3f9fff4c9811 a4bfaabaeabb d0161c81635b 75e070b6b447 d0161c81635b | import socket
import ssl
import multiprocessing
import logging as log
from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
import status
from datafile import DataFile
class Connection(BaseConnection):
def __init__(self,serverSocket,sslContext):
super().__init__()
sock, address = serverSocket.accept()
try: self._socket=sslContext.wrap_socket(sock,server_side=True)
except ssl.SSLError as e:
log.warning("Failed to establish an SSL connection from {0}.".format(sock.getpeername()))
raise e
log.info('Connected by {0}'.format(address))
self.createNetworkers()
class Miniserver:
def __init__(self,filename,treeFile=""):
self._filename=filename
self._treeFile=treeFile
self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
self._ssl.verify_mode=ssl.CERT_REQUIRED
self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._ss.bind(("", conf.port))
self._ss.listen(1)
def serve(self):
p=None
with self._ss:
while True:
try: connection=Connection(self._ss,self._ssl)
except ssl.SSLError: continue
if p and p.is_alive():
with connection as c:
c[0].readMsg()
c[1].writeMsg({"command":"deny","status":status.locked})
continue
p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
p.start()
class Server(NetNode):
def __init__(self,connection,filename,treeFile=""):
super().__init__(filename,treeFile)
(self._incoming,self._outcoming)=connection
self.BLOCK_SIZE=self._tree.BLOCK_SIZE
self._dataFileObj=None
@staticmethod
def run(connection,*args):
with connection as c:
s=Server(c,*args)
s.serve()
@property
def _dataFile(self):
if not self._dataFileObj:
self._dataFileObj=DataFile.open(self._filename, mode="rb+")
return self._dataFileObj
def serve(self):
try:
while self._serveOne(): pass
except (AssertionError,ConnectionResetError) as e:
log.warning(e)
def _serveOne(self):
jsonData,binData=self._incoming.readMsg()
if jsonData["command"]=="init":
if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters})
if jsonData["version"]<conf.lowestCompatible:
self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version})
if jsonData["action"]=="pull" and self.isLocked():
self._outcoming.writeMsg({"command":"deny","status":status.locked})
if jsonData["action"]=="push" and not self.isLocked():
self._lock()
self._outcoming.writeMsg({"command":"init", "version":conf.version})
elif jsonData["command"]=="req":
if jsonData["dataType"]=="data":
self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
else:
self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
elif jsonData["command"]=="end":
self._finalize()
if jsonData.get("action")=="push": self._unlock()
return False
else:
assert False, jsonData["command"]
return True
def _requestHash(self,indices):
log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
assert all(i<len(self._tree.store) for i in indices)
hashes=[self._tree.store[i] for i in indices]
jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
binResponse=b"".join(hashes)
return (jsonResponse,binResponse)
def _requestData(self,index):
log.info("received request for data block #{0}".format(index))
jsonResponse={"command":"send", "index":index, "dataType":"data"}
blocks=[]
for i in index:
blocks.append(self._dataFile.readFrom(i))
return (jsonResponse,b"".join(blocks))
def _receiveData(self,jsonData,binData):
if not self.isLocked(): self._lock()
log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
indices=jsonData["index"]
for (i,k) in enumerate(indices):
block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
self._dataFile.writeAt(k,block)
if self._treeFile:
self._newLeaves[k+self._tree.leafStart]=hashBlock(block)
return ({"command": "ack", "index": indices},)
def _finalize(self):
log.info("closing session...")
self._dataFile.close()
self._dataFileObj=None
if self._treeFile:
self._updateTree()
log.info("done")
|