Files
        @ 02ea4fed2520
    
        
              Branch filter: 
        
    Location: Morevna/src/server.py
        
            
            02ea4fed2520
            3.6 KiB
            text/x-python
        
        
    
    fixes, error handling
    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136  | import socket
import ssl
import multiprocessing
import logging as log
from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
class Connection(BaseConnection):
	def __init__(self,serverSocket,sslContext):
		super().__init__()
		sock, address = serverSocket.accept()
		self._socket=sslContext.wrap_socket(sock,server_side=True)
		log.info('Connected by {0}'.format(address))
		self.createNetworkers()
class Miniserver:
	def __init__(self,filename,treeFile=""):
		self._filename=filename
		self._treeFile=treeFile
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
		self._ssl.verify_mode=ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)
	def serve(self):
		while True:
			with Connection(self._ss,self._ssl) as c:
				p=multiprocessing.Process(target=Server.run,args=(c,self._filename,self._treeFile))
				p.start()
				p.join()
class Server(NetNode):
	def __init__(self,connection,filename,treeFile=""):
		super().__init__(filename,treeFile)
		(self._incoming,self._outcoming)=connection
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
		self._lastIndex=-1
		self._dataFileHandle=None
	@staticmethod
	def run(*args):
		s=Server(*args)
		s.serve()
	@property
	def _dataFile(self):
		if not self._dataFileHandle:
			self._dataFileHandle=open(self._filename, mode="rb+")
		return self._dataFileHandle
	def serve(self):
		try:
			while self._serveOne(): pass
		except (AssertionError,ConnectionResetError) as e:
			log.warning(e)
	def _serveOne(self):
		jsonData,binData=self._incoming.readMsg()
		if jsonData["command"]=="init":
			assert jsonData["blockSize"]==self.BLOCK_SIZE
			assert jsonData["blockCount"]==self._tree.leafCount
			self._outcoming.writeMsg({"command": "ack"})
		elif jsonData["command"]=="req":
			if jsonData["dataType"]=="data":
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
			else:
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
		elif jsonData["command"]=="end":
			self._finalize()
			self._locked=False
			return False
		else:
			assert False, jsonData["command"]
		return True
	def _requestHash(self,indices):
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
		assert all(i<len(self._tree.store) for i in indices)
		hashes=[self._tree.store[i] for i in indices]
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
		binResponse=b"".join(hashes)
		return (jsonResponse,binResponse)
	def _requestData(self,index):
		log.info("received request for data block #{0}".format(index))
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
		if self._lastIndex+1!=index:
			self._dataFile.seek(index*self.BLOCK_SIZE)
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
		return (jsonResponse,binResponse)
	def _receiveData(self,jsonData,binData):
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
		i=jsonData["index"]
		if self._lastIndex+1!=i:
			self._dataFile.seek(i*self.BLOCK_SIZE)
		self._dataFile.write(binData)
		self._lastIndex=i
		if self._treeFile:
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)
		return ({"command": "ack", "index": i},)
	def _finalize(self):
		log.info("closing session...")
		self._dataFile.close()
		self._dataFileHandle=None
		if self._treeFile:
			self._updateTree()
		log.info("done")
 |