Changeset - 362cff560740
[Not reviewed]
default
0 5 0
Laman - 8 years ago 2017-11-02 19:38:42

netnode locks on dirty data file, server can deny connection
5 files changed with 50 insertions and 14 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -2,24 +2,27 @@ import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from netnode import BaseConnection,NetNode,FailedConnection
 

	
 

	
 
class DeniedConnection(Exception): pass
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,host,port):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 

	
 
		try:
 
@@ -33,40 +36,42 @@ class Connection(BaseConnection):
 
			print("Error creating SSL connection to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 

	
 
		self.createNetworkers()
 
		print("Connected to {0}".format(host))
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self,filename,treeFile=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename,treeFile)
 

	
 
	def init(self,action):
 
		jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		if jsonData["command"]=="deny":
 
			raise DeniedConnection()
 
		assert jsonData["command"]=="init"
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree=self._tree
 
		blocksToTransfer=[]
 
		nodeStack=collections.deque([0]) # root
 

	
 
		# initialize session
 
		jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack"
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress=Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
			indices=[]
 
			for i in range(conf.batchSize):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 

	
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
@@ -106,30 +111,31 @@ class Client(NetNode):
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 
		self._outcoming.writeMsg({"command":"end","action":"push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer):
 
		self._lock()
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
			if i1+1!=i2:
 
@@ -141,18 +147,19 @@ class Client(NetNode):
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			stats.logTransferredBlock()
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/config.py
Show inline comments
 
@@ -12,18 +12,18 @@ handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
directory=os.path.join(os.path.dirname(__file__),"..")
 
certfile=os.path.join(directory,"certs/cert.pem")
 
keyfile=os.path.join(directory,"certs/key.pem")
 
peers=os.path.join(directory,"certs/peers.pem")
 

	
 
configFile=os.path.join(directory,"config.json")
 
conf=dict()
 
if os.path.isfile(configFile):
 
	with open(configFile) as f: conf=json.load(f)
 

	
 
version=0
 
version=(0,0,0)
 

	
 
hosts=conf.get("hosts",["127.0.0.1"])
 
port=conf.get("port",9901)
 

	
 
batchSize=conf.get("batchSize",256)
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon, splitHost
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection, FailedConnection
 
from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 
	if os.path.isfile(args.treefile):
 
@@ -34,48 +34,54 @@ def push(args):
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	for h in conf.hosts:
 
		host=splitHost(h,conf.port)
 
		stats.reset()
 
		try:
 
			with ClientConnection(*host) as con:
 
				c.setConnection(con)
 
				c.init("push")
 
				blocksToTransfer=c.negotiate()
 
				c.sendData(blocksToTransfer)
 
			print()
 
			print(stats.report())
 
			print()
 
		except FailedConnection: continue
 
		except FailedConnection: pass
 
		except DeniedConnection:
 
			print("Server {0}:{1} denied connection.".format(*host))
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	host=splitHost(conf.hosts[0],conf.port)
 
	try:
 
		with ClientConnection(*host) as con:
 
			c.setConnection(con)
 
			c.init("pull")
 
			blocksToTransfer=c.negotiate()
 
			c.pullData(blocksToTransfer)
 
		print()
 
		print(stats.report())
 
	except FailedConnection: pass
 
	except DeniedConnection:
 
		print("Server {0}:{1} denied connection.".format(*host))
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts.insert(0,args.host)
 
	if args.port: conf.port=args.port
 

	
 
	s=Miniserver(args.datafile,args.tree)
 
	try:
 
		spawnDaemon(s.serve)
 
	except Exception as e:
src/netnode.py
Show inline comments
 
import os
 
import socket
 
import logging as log
 

	
 
import config as conf
 
from networkers import NetworkReader,NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
lockFile=os.path.join(conf.directory,"dirty.lock")
 

	
 

	
 
class FailedConnection(Exception): pass
 
class LockedException(Exception): pass
 

	
 

	
 
class BaseConnection: # abstract
 
	def __init__(self):
 
		self._socket=None
 
		self.incoming=None
 
		self.outcoming=None
 

	
 
	def createNetworkers(self):
 
		fr=self._socket.makefile(mode="rb")
 
		fw=self._socket.makefile(mode="wb")
 

	
 
@@ -38,16 +44,29 @@ class NetNode:
 
		self._outcoming=None
 

	
 
		self._filename=filename
 
		self._treeFile=treeFile
 

	
 
		if treeFile:
 
			self._tree=HashTree.load(treeFile)
 
		else:
 
			self._tree=HashTree.fromFile(filename)
 

	
 
		self._newLeaves=dict()
 

	
 
	def isLocked(self):
 
		return os.path.isfile(lockFile)
 

	
 
	def _lock(self):
 
		try:
 
			f=open(lockFile,"x")
 
			f.close()
 
		except FileExistsError:
 
			raise LockedException()
 

	
 
	def _unlock(self):
 
		os.remove(lockFile)
 

	
 
	def _updateTree(self):
 
		log.info("updating hash tree...")
 
		self._tree.batchUpdate(self._newLeaves.items())
 
		self._tree.save(self._treeFile)
src/server.py
Show inline comments
 
@@ -62,40 +62,43 @@ class Server(NetNode):
 
		return self._dataFileHandle
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
		except (AssertionError,ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			assert jsonData["blockSize"]==self.BLOCK_SIZE
 
			assert jsonData["blockCount"]==self._tree.leafCount
 
			self._outcoming.writeMsg({"command": "ack"})
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
				self._outcoming.writeMsg({"command":"deny"})
 
			if jsonData["action"]=="pull" and self.isLocked():
 
				self._outcoming.writeMsg({"command":"deny"})
 

	
 
			self._outcoming.writeMsg({"command":"init", "version":conf.version})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData,binData))
 

	
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			self._locked=False
 
			if jsonData.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
@@ -106,24 +109,25 @@ class Server(NetNode):
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		i=jsonData["index"]
 
		if self._lastIndex+1!=i:
 
			self._dataFile.seek(i*self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)
 

	
 
		return ({"command": "ack", "index": i},)
 

	
0 comments (0 inline, 0 general)