Changeset - ccbe369ce439
[Not reviewed]
default
0 6 1
Laman - 7 years ago 2018-01-19 01:17:36

checking version compatibility
7 files changed with 28 insertions and 10 deletions:
0 comments (0 inline, 0 general)
protocol.md
Show inline comments
 
# Protocol #
 

	
 
[Communication protocol proposal. Not yet fully implemented and expected to change.]
 
[Communication protocol proposal. Not yet fully implemented and still expected to change.]
 

	
 
Nodes communicate by exchanging messages through network sockets. Basic message format is simple:
 

	
 
		json-length: ABC
 
		bin-length: DEF
 
		{... JSON payload ...}
 
		... binary payload ...
 

	
 
`ABC` is the JSON payload length (decimal integer), `DEF` is the binary payload length. JSON payload always contains a "command" field.
 

	
 
The client sends a request (a message) and the server answers with a response (another message).
 

	
 

	
 
## Commands ##
 
### <a name="init"></a>init ###
 
Initiates communication between the client and the server. Checks that the nodes can meaningfully talk to each other.
 

	
 
#### Params ####
 
- (str) version: program version. Both the client and the server are to check for compatibility with their partner.
 
- [(int) major, (int) minor, (int) tiny] version version: program version. Both the client and the server are to check for compatibility with their partner.
 
- {push, pull} action: what the client desires to do
 
- (str) filename: name of the file to be synchronized
 
- (int) blockSize
 
- (int) blockCount
 

	
 
#### Responses ####
 
- [init](#r-init)
 
- [deny](#deny)
 

	
 

	
 
### <a name="req"></a>req ###
 
Requests hashes or a data chunk from the server.
 

	
 
#### Params ####
 
- {hash, data} dataType
 

	
 
hash
 
- [(int) i0, ...] index: list of requested node keys from the HashTree
 

	
 
data
 
- (int) index: requested data block number in the data file. Returns blockSize bytes starting at blockSize * index.
 

	
 
#### Responses ####
 
- [send](#send)
 
@@ -53,44 +53,45 @@ Sends data. Can be a client action or a 
 
#### Params ####
 
- {hash, data} dataType
 
- index: see [req](#req)
 

	
 
#### Responses ####
 
- [ack](#ack)
 
- [err](#err)
 

	
 

	
 
### <a name="end"></a>end ###
 
Cease talking and close the connection.
 

	
 
#### Params ####
 
- {push} (optional) action: unlocks the server's dirty lock
 

	
 
#### Responses ####
 
- end
 

	
 

	
 
## Responses ##
 
### <a name="r-init"></a>init ###
 
See [the init command](#init). If the client doesn't like server's protocol version, it is free to [end](#end) the connection.
 

	
 
#### Params ####
 
- (str) version
 
- [(int) major, (int) minor, (int) tiny] version
 

	
 

	
 
### <a name="deny"></a>deny ###
 
The server refuses connection. It might be busy, corrupted or incompatible.
 

	
 
#### Params ####
 
- (int) retry: suggested delay in seconds before another attempt at connection
 
- (int) status
 
- (str) msg
 

	
 

	
 
### <a name="ack"></a>ack ###
 
Everything is alright, but the server has nothing better to say.
 

	
 

	
 
### <a name="err"></a>err ###
 
Something bad happened.
 

	
 
#### Params ####
 
- (str) msg
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import status
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from netnode import BaseConnection,NetNode,FailedConnection,LockedException
 
from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException
 

	
 

	
 
class DeniedConnection(Exception): pass
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,host,port):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 

	
 
		try:
 
			self._socket.connect((host,port))
 
		except (ConnectionRefusedError,OSError) as e:
 
			log.exception(e)
 
			print("Couldn't connect to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 
		except ssl.SSLError as e:
 
			log.exception(e)
 
			print("Error creating SSL connection to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 

	
 
		self.createNetworkers()
 
		print("Connected to {0}".format(host))
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self,filename,treeFile=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename,treeFile)
 

	
 
	def init(self,action):
 
		jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		if jsonData["command"]=="deny":
 
			if jsonData["status"]==status.incompatible.version:
 
				raise DeniedConnection("Incompatible client version. Consider upgrading it.")
 
			raise DeniedConnection()
 
		assert jsonData["command"]=="init"
 
		if jsonData["version"]<conf.lowestCompatible:
 
			raise IncompatibleException("Incompatible server version. Consider upgrading it.")
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._tree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			indices=[]
 
			for i in range(conf.batchSize):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
src/config.py
Show inline comments
 
import os
 
import json
 
import logging as log
 
from logging.handlers import TimedRotatingFileHandler
 

	
 

	
 
logger=log.getLogger()
 
logger.setLevel(log.INFO)
 
formatter=log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler=TimedRotatingFileHandler("/var/log/morevna/mor.log",when="midnight",backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
directory=os.path.join(os.path.dirname(__file__),"..")
 
certfile=os.path.join(directory,"certs/cert.pem")
 
keyfile=os.path.join(directory,"certs/key.pem")
 
peers=os.path.join(directory,"certs/peers.pem")
 

	
 
configFile=os.path.join(directory,"config.json")
 
conf=dict()
 
if os.path.isfile(configFile):
 
	with open(configFile) as f: conf=json.load(f)
 

	
 
version=(0,0,0)
 
version=[0,0,1]
 
lowestCompatible=[0,0,0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts=conf.get("hosts",["127.0.0.1"])
 
port=conf.get("port",9901)
 

	
 
batchSize=conf.get("batchSize",256)
src/morevna.py
Show inline comments
 
@@ -29,71 +29,73 @@ def buildTree(args):
 
	tree=HashTree.fromFile(args.datafile)
 
	tree.save(args.treefile)
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	for h in conf.hosts:
 
		host=splitHost(h,conf.port)
 
		stats.reset()
 
		try:
 
			with ClientConnection(*host) as con:
 
				c.setConnection(con)
 
				c.init("push")
 
				blocksToTransfer=c.negotiate()
 
				c.sendData(blocksToTransfer)
 
			print()
 
			print(stats.report())
 
			print()
 
		except FailedConnection: pass
 
		except DeniedConnection:
 
		except DeniedConnection as e:
 
			print("Server {0}:{1} denied connection.".format(*host))
 
			print(e)
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	host=splitHost(conf.hosts[0],conf.port)
 
	try:
 
		with ClientConnection(*host) as con:
 
			c.setConnection(con)
 
			c.init("pull")
 
			blocksToTransfer=c.negotiate()
 
			c.pullData(blocksToTransfer,args.force)
 
		print()
 
		print(stats.report())
 
	except FailedConnection: pass
 
	except DeniedConnection:
 
	except DeniedConnection as e:
 
		print("Server {0}:{1} denied connection.".format(*host))
 
		print(e)
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Miniserver(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s",e)
 

	
 

	
 
parser=ArgumentParser()
 
subparsers=parser.add_subparsers()
 

	
 
pBuild=subparsers.add_parser("build")
 
pBuild.add_argument("-f","--force",action="store_true",help="force tree rebuild")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 

	
src/netnode.py
Show inline comments
 
import os
 
import socket
 
import logging as log
 

	
 
import config as conf
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
lockFile=os.path.join(conf.directory,"dirty.lock")
 

	
 

	
 
class FailedConnection(Exception): pass
 
class LockedException(Exception): pass
 
class IncompatibleException(Exception): pass
 

	
 

	
 
class BaseConnection: # abstract
 
	def __init__(self):
 
		self._socket=None
 
		self.incoming=None
 
		self.outcoming=None
 

	
 
	def createNetworkers(self):
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
		self.incoming=NetworkReader(fr)
 
		self.outcoming=NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming,self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		try:
 
			self._socket.shutdown(socket.SHUT_RDWR)
 
			self._socket.close()
 
		except OSError:
 
			log.warning("broken connection")
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from netnode import BaseConnection,NetNode
 
import config as conf
 
import status
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,serverSocket,sslContext):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self,filename,treeFile=""):
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		self._ssl=ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile=conf.peers)
 
		self._ssl.verify_mode=ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		p=None
 
		with self._ss:
 
			while True:
 
				connection=Connection(self._ss,self._ssl)
 
				if p and p.is_alive():
 
					with connection as c:
 
						c[0].readMsg()
 
						c[1].writeMsg({"command":"deny"})
 
						c[1].writeMsg({"command":"deny","status":status.locked})
 
					continue
 
				p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self,connection,filename,treeFile=""):
 
		super().__init__(filename,treeFile)
 
		(self._incoming,self._outcoming)=connection
 

	
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 

	
 
	@staticmethod
 
	def run(connection,*args):
 
		with connection as c:
 
			s=Server(c,*args)
 
			s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
		except (AssertionError,ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
				self._outcoming.writeMsg({"command":"deny"})
 
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.parameters})
 
			if jsonData["version"]<conf.lowestCompatible:
 
				self._outcoming.writeMsg({"command":"deny","status":status.incompatible.version})
 
			if jsonData["action"]=="pull" and self.isLocked():
 
				self._outcoming.writeMsg({"command":"deny"})
 
				self._outcoming.writeMsg({"command":"deny","status":status.locked})
 
			if jsonData["action"]=="push" and not self.isLocked():
 
				self._lock()
 

	
 
			self._outcoming.writeMsg({"command":"init", "version":conf.version})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			if jsonData.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
src/status.py
Show inline comments
 
new file 100644
 
class incompatible:
 
	version=100
 
	parameters=101
 

	
 
locked=102
0 comments (0 inline, 0 general)