Changeset - 3f9fff4c9811
[Not reviewed]
default
0 3 0
Laman - 7 years ago 2017-10-10 17:02:35

fixed long running server, fixed logging
3 files changed with 18 insertions and 8 deletions:
0 comments (0 inline, 0 general)
src/config.py
Show inline comments
 
import os
 
import logging as log
 
from logging.handlers import TimedRotatingFileHandler
 

	
 

	
 
log.basicConfig(
 
	level=log.INFO,format="%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S"
 
)
 
logger=log.getLogger()
 
logger.addHandler(TimedRotatingFileHandler("/var/log/morevna.log",when="midnight",backupCount=9))
 
logger.setLevel(log.INFO)
 
formatter=log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler=TimedRotatingFileHandler("/var/log/morevna/mor.log",when="midnight",backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 

	
 
version=0
 

	
 
hosts=["127.0.0.1"]
 
port=9901
 

	
 
directory=os.path.join(os.path.dirname(__file__),"..")
 
certfile=os.path.join(directory,"certs/cert.pem")
 
keyfile=os.path.join(directory,"certs/key.pem")
 
peers=os.path.join(directory,"certs/peers.pem")
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client
 
from server import Server
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 

	
 
	tree=HashTree.fromFile(args.datafile)
 
	tree.save(args.treefile)
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile)
 
	blocksToTransfer=c.negotiate()
 
	c.sendData(blocksToTransfer)
 
	print()
 
	print(stats.report())
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Server(args.datafile,args.tree)
 
	try:
 
	spawnDaemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s",e)
 

	
 

	
 
parser=ArgumentParser()
 
subparsers=parser.add_subparsers()
 

	
 
pBuild=subparsers.add_parser("build")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 

	
 
pUpdate=subparsers.add_parser("push")
 
pUpdate.add_argument("-p","--port",type=int)
 
pUpdate.add_argument("--host",default="127.0.0.1")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=push)
 

	
 
pServe=subparsers.add_parser("serve")
 
pServe.add_argument("-p","--port",type=int)
 
pServe.add_argument("--host",default="")
 
pServe.add_argument("-t","--tree",help="stored hash tree location")
 
pServe.add_argument("datafile")
 
pServe.set_defaults(func=serve)
 

	
 
args=parser.parse_args()
 
args.func(args)
src/server.py
Show inline comments
 
@@ -6,132 +6,136 @@ import logging as log
 
from hashtree import HashTree
 
from networkers import NetworkReader,NetworkWriter
 
import config as conf
 

	
 

	
 
class Connection:
 
	def __init__(self,serverSocket,sslContext):
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		self._socket.shutdown(socket.SHUT_RDWR)
 
		self._socket.close()
 

	
 

	
 
class Server:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 
		self._locked=False
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
		self._lastIndex=-1
 
		self._dataFile=None
 
		self._dataFileHandle=None
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		while True:
 
			with Connection(self._ss,self._ssl) as (incoming, outcoming):
 
				try:
 
					while self._serveOne(incoming,outcoming):
 
						pass
 
				except AssertionError:
 
					continue
 

	
 
	def _serveOne(self,incoming,outcoming):
 
		jsonData,binData=incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._locked=True
 
			outcoming.writeMsg({"command": "ack"})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			self._locked=False
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,index):
 
		log.info("received request for node #{0}".format(index))
 
		assert index<len(self._tree.store)
 
		nodeHash=self._tree.store[index]
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"hash"}
 
		binResponse=nodeHash
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		if not self._dataFile:
 
			self._dataFile=open(self._filename, mode="rb+")
 
		i=jsonData["index"]
 
		if self._lastIndex+1!=i:
 
			self._dataFile.seek(i*self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashlib.sha256(binData).digest()[HashTree.HASH_LEN:]
 

	
 
		return ({"command": "ack", "index": i},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		if self._dataFile:
 
			self._dataFile.close()
 
		self._dataFileHandle=None
 
		if self._treeFile:
 
			log.info("updating hash tree...")
 
			for (k,v) in self._newLeaves.items():
 
				self._tree.updateLeaf(k, v)
 
			self._tree.save(self._treeFile)
 
		log.info("done")
0 comments (0 inline, 0 general)