Files @ 13603ac261f1
Branch filter:

Location: Morevna/src/server.py - annotation

Laman
minor enhancements
8bb6a904d50b
cd2ba192bf12
6c8e994fd906
d0161c81635b
d0161c81635b
75e070b6b447
75e070b6b447
34f4027c1bd6
ccbe369ce439
a4bfaabaeabb
34f4027c1bd6
34f4027c1bd6
75e070b6b447
cd2ba192bf12
75e070b6b447
75e070b6b447
cd2ba192bf12
922291d72c0f
922291d72c0f
922291d72c0f
922291d72c0f
cd2ba192bf12
34f4027c1bd6
75e070b6b447
34f4027c1bd6
34f4027c1bd6
6c8e994fd906
8b0dc65400f3
026618d6681b
d0161c81635b
34f4027c1bd6
cd2ba192bf12
cd2ba192bf12
cd2ba192bf12
cd2ba192bf12
026618d6681b
026618d6681b
026618d6681b
34f4027c1bd6
6c8e994fd906
b022997ba96d
b022997ba96d
b022997ba96d
922291d72c0f
922291d72c0f
b022997ba96d
b022997ba96d
b022997ba96d
ccbe369ce439
b022997ba96d
b022997ba96d
6c8e994fd906
6c8e994fd906
6c8e994fd906
75e070b6b447
6c8e994fd906
75e070b6b447
6c8e994fd906
6c8e994fd906
6c8e994fd906
6c8e994fd906
a4bfaabaeabb
3f9fff4c9811
6c8e994fd906
b022997ba96d
b022997ba96d
b022997ba96d
b022997ba96d
6c8e994fd906
3f9fff4c9811
3f9fff4c9811
a4bfaabaeabb
a4bfaabaeabb
a4bfaabaeabb
34f4027c1bd6
b73a5d69a11b
6c8e994fd906
6c8e994fd906
9f2b0a4f3538
6c8e994fd906
34f4027c1bd6
6c8e994fd906
6c8e994fd906
34f4027c1bd6
dad65188b1a0
362cff560740
ccbe369ce439
ccbe369ce439
ccbe369ce439
362cff560740
ccbe369ce439
b022997ba96d
b022997ba96d
362cff560740
362cff560740
b73a5d69a11b
dad65188b1a0
44cf81f3b6b8
6c8e994fd906
44cf81f3b6b8
6c8e994fd906
dad65188b1a0
dad65188b1a0
6c8e994fd906
34f4027c1bd6
dad65188b1a0
d0161c81635b
362cff560740
dad65188b1a0
b73a5d69a11b
dad65188b1a0
dad65188b1a0
34f4027c1bd6
dad65188b1a0
34f4027c1bd6
5813971dbecc
5813971dbecc
5813971dbecc
5813971dbecc
44cf81f3b6b8
5813971dbecc
5813971dbecc
34f4027c1bd6
44cf81f3b6b8
44cf81f3b6b8
44cf81f3b6b8
44cf81f3b6b8
44cf81f3b6b8
44cf81f3b6b8
bb3b53ee15d6
bb3b53ee15d6
a4bfaabaeabb
34f4027c1bd6
bb3b53ee15d6
b73a5d69a11b
b73a5d69a11b
362cff560740
b73a5d69a11b
34f4027c1bd6
87a9ced6e7b5
87a9ced6e7b5
4f3ff4c311f2
a4bfaabaeabb
87a9ced6e7b5
4f3ff4c311f2
34f4027c1bd6
87a9ced6e7b5
d0161c81635b
d0161c81635b
d0161c81635b
3f9fff4c9811
a4bfaabaeabb
d0161c81635b
75e070b6b447
d0161c81635b
import socket
import ssl
import multiprocessing
import logging as log

from hashtree import hashBlock
from netnode import BaseConnection,NetNode
import config as conf
import status
from datafile import DataFile


class Connection(BaseConnection):
	def __init__(self,serverSocket,sslContext):
		super().__init__()

		sock, address = serverSocket.accept()
		try: self._socket=sslContext.wrap_socket(sock,server_side=True)
		except ssl.SSLError as e:
			log.warning("Failed to establish an SSL connection from {0}.".format(sock.getpeername()))
			raise e

		log.info('Connected by {0}'.format(address))
		self.createNetworkers()


class Miniserver:
	def __init__(self,filename,treeFile=""):
		self._filename=filename
		self._treeFile=treeFile

		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
		self._ssl.verify_mode=ssl.CERT_REQUIRED
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)

		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self._ss.bind(("", conf.port))
		self._ss.listen(1)

	def serve(self):
		p=None
		with self._ss:
			while True:
				try: connection=Connection(self._ss,self._ssl)
				except ssl.SSLError: continue
				if p and p.is_alive():
					with connection as c:
						c[0].readMsg()
						c[1].writeMsg({"command":"deny","status":status.locked})
					continue
				p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
				p.start()


class Server(NetNode):
	def __init__(self,connection,filename,treeFile=""):
		super().__init__(filename,treeFile)
		(self._incoming,self._outcoming)=connection

		self.BLOCK_SIZE=self._tree.BLOCK_SIZE

		self._dataFileObj=None

	@staticmethod
	def run(connection,*args):
		with connection as c:
			s=Server(c,*args)
			s.serve()

	@property
	def _dataFile(self):
		if not self._dataFileObj:
			self._dataFileObj=DataFile.open(self._filename, mode="rb+")
		return self._dataFileObj

	def serve(self):
		try:
			while self._serveOne(): pass
		except (AssertionError,ConnectionResetError) as e:
			log.warning(e)

	def _serveOne(self):
		jsonData,binData=self._incoming.readMsg()

		if jsonData["command"]=="init":
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters})
			if jsonData["version"]<conf.lowestCompatible:
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version})
			if jsonData["action"]=="pull" and self.isLocked():
				self._outcoming.writeMsg({"command":"deny","status":status.locked})
			if jsonData["action"]=="push" and not self.isLocked():
				self._lock()

			self._outcoming.writeMsg({"command":"init", "version":conf.version})

		elif jsonData["command"]=="req":
			if jsonData["dataType"]=="data":
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
			else:
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))

		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))

		elif jsonData["command"]=="end":
			self._finalize()
			if jsonData.get("action")=="push": self._unlock()
			return False

		else:
			assert False, jsonData["command"]

		return True

	def _requestHash(self,indices):
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
		assert all(i<len(self._tree.store) for i in indices)
		hashes=[self._tree.store[i] for i in indices]

		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
		binResponse=b"".join(hashes)

		return (jsonResponse,binResponse)

	def _requestData(self,index):
		log.info("received request for data block #{0}".format(index))

		jsonResponse={"command":"send", "index":index, "dataType":"data"}
		blocks=[]
		for i in index:
			blocks.append(self._dataFile.readFrom(i))

		return (jsonResponse,b"".join(blocks))

	def _receiveData(self,jsonData,binData):
		if not self.isLocked(): self._lock()
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))

		indices=jsonData["index"]
		for (i,k) in enumerate(indices):
			block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
			self._dataFile.writeAt(k,block)
			if self._treeFile:
				self._newLeaves[k+self._tree.leafStart]=hashBlock(block)

		return ({"command": "ack", "index": indices},)

	def _finalize(self):
		log.info("closing session...")
		self._dataFile.close()
		self._dataFileObj=None
		if self._treeFile:
			self._updateTree()
		log.info("done")