Changeset - 6a0ab4fe9f5e
[Not reviewed]
default
0 13 0
Laman - 5 years ago 2020-05-24 15:51:35

changed naming to snake case. sadly no love for camel case in this world
13 files changed with 352 insertions and 349 deletions:
0 comments (0 inline, 0 general)
src/benchmark.py
Show inline comments
 
@@ -5,21 +5,21 @@ import threading
 
from hashtree import HashTree
 

	
 

	
 
def timeF(f):
 
def time_f(f):
 
	start = time()
 
	f()
 
	end = time()
 
	print((end-start), "s")
 

	
 

	
 
def fullRead():
 
def full_read():
 
	block = True
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
		while block:
 
			block = f.read(HashTree.BLOCK_SIZE)
 

	
 

	
 
def selectedRead():
 
def selected_read():
 
	with open("/home/laman/blocks.txt") as f:
 
		blocks = [int(x) for x in f]
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
@@ -31,7 +31,7 @@ def selectedRead():
 
			i1 = i2
 

	
 

	
 
def lessSelectedRead():
 
def less_selected_read():
 
	with open("/home/laman/blocks.txt") as f:
 
		blocks = [int(x) for x in f]
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
@@ -45,19 +45,19 @@ def lessSelectedRead():
 
			i1 = i2
 

	
 

	
 
def shortSockets():
 
def short_sockets():
 
	def _server():
 
		serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		serverSock.bind(("", 12329))
 
		serverSock.listen(1)
 
		server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		server_sock.bind(("", 12329))
 
		server_sock.listen(1)
 

	
 
		for i in range(10000):
 
			sock, address = serverSock.accept()
 
			sock, address = server_sock.accept()
 
			with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
				fr.readline()
 
			sock.shutdown(socket.SHUT_RDWR)
 
			sock.close()
 
		serverSock.close()
 
		server_sock.close()
 

	
 
	def _client():
 
		for i in range(10000):
 
@@ -77,12 +77,12 @@ def shortSockets():
 
	c.join()
 

	
 

	
 
def longSockets():
 
def long_sockets():
 
	def _server():
 
		serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		serverSock.bind(("", 12330))
 
		serverSock.listen(1)
 
		sock, address = serverSock.accept()
 
		server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		server_sock.bind(("", 12330))
 
		server_sock.listen(1)
 
		sock, address = server_sock.accept()
 

	
 
		with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
			for i in range(10000):
 
@@ -90,7 +90,7 @@ def longSockets():
 

	
 
		sock.shutdown(socket.SHUT_RDWR)
 
		sock.close()
 
		serverSock.close()
 
		server_sock.close()
 

	
 
	def _client():
 
		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
@@ -110,9 +110,9 @@ def longSockets():
 
	c.join()
 

	
 

	
 
# timeF(fullRead) # 85.40341448783875 s
 
# timeF(selectedRead) # 6.774365186691284 s
 
# timeF(lessSelectedRead) # 5.930811405181885 s
 
# time_f(full_read) # 85.40341448783875 s
 
# time_f(selected_read) # 6.774365186691284 s
 
# time_f(less_selected_read) # 5.930811405181885 s
 

	
 
# timeF(shortSockets) # 3.928339719772339 s
 
# timeF(longSockets) # 0.15576839447021484 s
 
# time_f(short_sockets) # 3.928339719772339 s
 
# time_f(long_sockets) # 0.15576839447021484 s
src/client.py
Show inline comments
 
@@ -8,7 +8,7 @@ import config as conf
 
import status
 
import stats
 
from util import Progress
 
from hashtree import HashTree, hashBlock
 
from hashtree import HashTree, hash_block
 
from netnode import BaseConnection, NetNode, FailedConnection, LockedException, IncompatibleException
 
from datafile import DataFile
 

	
 
@@ -21,11 +21,11 @@ class Connection(BaseConnection):
 
		super().__init__()
 
		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext = ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname = False
 
		sslContext.load_cert_chain(conf.certfile, conf.keyfile)
 
		ssl_context = ssl.create_default_context(cafile=conf.peers)
 
		ssl_context.check_hostname = False
 
		ssl_context.load_cert_chain(conf.certfile, conf.keyfile)
 

	
 
		self._socket = sslContext.wrap_socket(sock)
 
		self._socket = ssl_context.wrap_socket(sock)
 

	
 
		try:
 
			self._socket.connect((host, port))
 
@@ -38,152 +38,153 @@ class Connection(BaseConnection):
 
			print("Error creating SSL connection to {0}:{1}".format(host, port))
 
			raise FailedConnection()
 

	
 
		self.createNetworkers()
 
		self.create_networkers()
 
		print("Connected to {0}".format(host))
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename, treeFile)
 
		super().__init__(filename, tree_file)
 

	
 
	def init(self, action):
 
		jsonData = {
 
		json_data = {
 
			"command": "init",
 
			"blockSize": self._tree.BLOCK_SIZE,
 
			"blockCount": self._tree.leafCount,
 
			"blockCount": self._tree.leaf_count,
 
			"version": conf.version,
 
			"action": action
 
		}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData, binData = self._incoming.readMsg()
 
		if jsonData["command"]=="deny":
 
			if jsonData["status"]==status.incompatible.version:
 
		self._outcoming.write_msg(json_data)
 
		json_data, bin_data = self._incoming.read_msg()
 
		if json_data["command"] == "deny":
 
			if json_data["status"] == status.incompatible.version:
 
				raise DeniedConnection("Incompatible client version. Consider upgrading it.")
 
			raise DeniedConnection()
 
		assert jsonData["command"]=="init"
 
		if jsonData["version"]<conf.lowestCompatible:
 
		assert json_data["command"] == "init"
 
		if json_data["version"] < conf.lowest_compatible:
 
			raise IncompatibleException("Incompatible server version. Consider upgrading it.")
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	# Requests nodes in order of a batch DFS. Needs a stack of size O(tree_depth*batch_size). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree = self._tree
 
		blocksToTransfer = []
 
		nodeStack = collections.deque([0])  # root
 
		local_tree = self._tree
 
		blocks_to_transfer = []
 
		node_stack = collections.deque([0])  # root
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress = Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
		progress = Progress(local_tree.leaf_count)
 
		while len(node_stack) > 0:
 
			indices = []
 
			for i in range(conf.batchSize.hash):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 
			for i in range(conf.batch_size.hash):
 
				indices.append(node_stack.pop())
 
				if len(node_stack) == 0: break
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "hash"})
 

	
 
			jsonData, binData = self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode(len(indices))
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["index"] == indices
 
			assert json_data["dataType"] == "hash"
 
			stats.log_exchanged_node(len(indices))
 

	
 
			frontier = []
 
			for (j, i) in enumerate(indices):
 
				(j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
				if local_tree.store[i] != bin_data[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2<len(localTree.store): # inner node
 
					if 2*i+2 < len(local_tree.store):  # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
						blocks_to_transfer.append(i-local_tree.leaf_start)  # leaf
 
						progress.p(i-local_tree.leaf_start)
 
			node_stack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size = stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		size = stats.format_bytes(len(blocks_to_transfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 
		return blocks_to_transfer
 

	
 
	def sendData(self, blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile = DataFile.open(self._filename)
 
	def send_data(self, blocks_to_transfer):
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename)
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		for k in range(0, len(blocksToTransfer), conf.batchSize.data):
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = []
 
			blocks = []
 
			for j in range(conf.batchSize.data):
 
				if k+j>=len(blocksToTransfer): break
 
				i = blocksToTransfer[k+j]
 
				block = dataFile.readFrom(i)
 
			for j in range(conf.batch_size.data):
 
				if k+j >= len(blocks_to_transfer): break
 
				i = blocks_to_transfer[k+j]
 
				block = data_file.read_from(i)
 

	
 
				indices.append(i)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 

	
 
				progress.p(k+j)
 
			if indices: self._sendData(indices, blocks)
 
			if indices: self._send_data(indices, blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end", "action":"push"})
 
		self._outcoming.write_msg({"command": "end", "action": "push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		data_file.close()
 

	
 
	def pullData(self, blocksToTransfer, ignoreLock=False):
 
		if not ignoreLock:
 
	def pull_data(self, blocks_to_transfer, ignore_lock=False):
 
		if not ignore_lock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				print(
 
					"The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocksToTransfer)
 
		dataFile = DataFile.open(self._filename, mode="rb+")
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress = Progress(len(blocksToTransfer))
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		for k in range(0, len(blocksToTransfer), conf.batchSize.data):
 
			indices = blocksToTransfer[k:k+conf.batchSize.data]
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"})
 
			jsonData, binData = self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = blocks_to_transfer[k:k+conf.batch_size.data]
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"})
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data
 
			for (j, i) in enumerate(indices):
 
				block = binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				dataFile.writeAt(i, block)
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				if self._treeFile:
 
					self._newLeaves[i+self._tree.leafStart] = hashBlock(block)
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 

	
 
				stats.logTransferredBlock()
 
				stats.log_transferred_block()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 
		self._outcoming.write_msg({"command": "end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		data_file.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 
		if self._tree_file:
 
			self._update_tree()
 

	
 
	def _sendData(self, indices, blocks):
 
		jsonData = {"command":"send", "index":indices, "dataType":"data"}
 
		binData = b"".join(blocks)
 
		self._outcoming.writeMsg(jsonData, binData)
 
		stats.logTransferredBlock(len(indices))
 
		jsonData, binData = self._incoming.readMsg()
 
		assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData
 
	def _send_data(self, indices, blocks):
 
		json_data = {"command": "send", "index": indices, "dataType": "data"}
 
		bin_data = b"".join(blocks)
 
		self._outcoming.write_msg(json_data, bin_data)
 
		stats.log_transferred_block(len(indices))
 
		json_data, bin_data = self._incoming.read_msg()
 
		assert json_data["command"]=="ack" and json_data["index"]==indices, json_data
 

	
 
	def setConnection(self, connection):
 
	def set_connection(self, connection):
 
		(self._incoming, self._outcoming) = connection
src/config.py
Show inline comments
 
@@ -5,17 +5,17 @@ from logging.handlers import TimedRotati
 

	
 

	
 
directory = os.path.join(os.path.dirname(__file__), "..")
 
configFile = os.path.join(directory, "config.json")
 
config_file = os.path.join(directory, "config.json")
 
conf = dict()
 
if os.path.isfile(configFile):
 
	with open(configFile) as f: conf = json.load(f)
 
if os.path.isfile(config_file):
 
	with open(config_file) as f: conf = json.load(f)
 

	
 
logFile = conf.get("logFile", "/var/log/morevna/mor.log")
 
log_file = conf.get("logFile", "/var/log/morevna/mor.log")
 

	
 
logger = log.getLogger()
 
logger.setLevel(log.INFO)
 
formatter = log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler = TimedRotatingFileHandler(logFile, when="midnight", backupCount=9)
 
handler = TimedRotatingFileHandler(log_file, when="midnight", backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
@@ -24,12 +24,12 @@ keyfile = os.path.join(directory,"certs/
 
peers = os.path.join(directory,"certs/peers.pem")
 

	
 
version = [0, 1, 1]
 
lowestCompatible=[0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 
lowest_compatible = [0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts=conf.get("hosts", ["127.0.0.1"])
 
port=conf.get("port", 9901)
 

	
 
bSize=conf.get("batchSize", dict())
 
class batchSize:
 
bSize = conf.get("batch_size", dict())
 
class batch_size:
 
	hash=bSize.get("hash", 256)
 
	data=bSize.get("data", 64)
src/datafile.py
Show inline comments
 
@@ -5,25 +5,25 @@ BLOCK_SIZE = HashTree.BLOCK_SIZE
 

	
 

	
 
class DataFile:
 
	def __init__(self, fileHandle):
 
		self._lastIndex = 0
 
		self._f = fileHandle
 
	def __init__(self, file_handle):
 
		self._last_index = 0
 
		self._f = file_handle
 

	
 
	@staticmethod
 
	def open(filename, mode="rb"):
 
		return DataFile(open(filename, mode=mode))
 

	
 
	def writeAt(self, i, blockData):
 
		if i!=self._lastIndex+1:
 
	def write_at(self, i, block_data):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._f.write(blockData)
 
		self._lastIndex = i
 
		self._f.write(block_data)
 
		self._last_index = i
 

	
 
	def readFrom(self, i, byteCount=BLOCK_SIZE):
 
		if i!=self._lastIndex+1:
 
	def read_from(self, i, byte_count=BLOCK_SIZE):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._lastIndex = i
 
		return self._f.read(byteCount)
 
		self._last_index = i
 
		return self._f.read(byte_count)
 

	
 
	def close(self):
 
		self._f.close()
src/hashtree.py
Show inline comments
 
@@ -6,7 +6,7 @@ from datetime import datetime
 
from util import Progress
 

	
 

	
 
def hashBlock(data):
 
def hash_block(data):
 
	return hashlib.sha256(data).digest()[-HashTree.HASH_LEN:]
 

	
 

	
 
@@ -14,30 +14,30 @@ class HashTree:
 
	HASH_LEN = 16 # bytes
 
	BLOCK_SIZE = 4096 # bytes
 
	
 
	## Prepares a tree containing leafCount leaves.
 
	def __init__(self, leafCount):
 
		self.store = [b""]*(leafCount*2-1)
 
		self.leafStart = leafCount-1
 
		self.leafCount = leafCount
 
		self._index = self.leafStart
 
	## Prepares a tree containing leaf_count leaves.
 
	def __init__(self, leaf_count):
 
		self.store = [b""]*(leaf_count*2-1)
 
		self.leaf_start = leaf_count-1
 
		self.leaf_count = leaf_count
 
		self._index = self.leaf_start
 
		
 
	@classmethod
 
	def fromFile(cls, filename):
 
	def from_file(cls, filename):
 
		with open(filename, "rb") as f:
 
			stat = os.fstat(f.fileno())
 
			size = stat.st_size # !! symlinks
 
			leafCount = (size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks
 
			res = cls(leafCount)
 
			leaf_count = (size-1)//HashTree.BLOCK_SIZE+1  # number of leaf blocks
 
			res = cls(leaf_count)
 
			print(datetime.now(), "hashing file:")
 

	
 
			progress = Progress(leafCount)
 
			for i in range(leafCount):
 
			progress = Progress(leaf_count)
 
			for i in range(leaf_count):
 
				data = f.read(HashTree.BLOCK_SIZE)
 
				res.insertLeaf(hashBlock(data))
 
				res.insert_leaf(hash_block(data))
 

	
 
				progress.p(i)
 
			progress.done()
 
		res.buildTree()
 
		res.build_tree()
 
		
 
		return res
 

	
 
@@ -46,10 +46,10 @@ class HashTree:
 
		with open(filename, "rb") as f:
 
			stat = os.fstat(f.fileno())
 
			size = stat.st_size
 
			nodeCount = size//HashTree.HASH_LEN
 
			res = cls((nodeCount+1)//2)
 
			node_count = size//HashTree.HASH_LEN
 
			res = cls((node_count+1)//2)
 

	
 
			for i in range(nodeCount):
 
			for i in range(node_count):
 
				res.store[i] = f.read(HashTree.HASH_LEN)
 
		return res
 

	
 
@@ -61,44 +61,44 @@ class HashTree:
 
	## Inserts a leaf at the first empty position.
 
	#
 
	#	Useful and used only during the tree construction.
 
	def insertLeaf(self,h):
 
	def insert_leaf(self, h):
 
		self.store[self._index] = h
 
		self._index += 1
 
		
 
	## Updates a hash stored in the leaf.
 
	def updateLeaf(self, index, h):
 
		if index<self.leafStart: raise IndexError()
 
	def update_leaf(self, index, h):
 
		if index<self.leaf_start: raise IndexError()
 
		
 
		self.store[index] = h
 
		self.updateNode((index-1)//2)
 
		self.update_node((index-1)//2)
 
	
 
	## Updates the node at index and all its ancestors.
 
	def updateNode(self, index):
 
	def update_node(self, index):
 
		while index>=0:
 
			self.store[index] = hashBlock(self.store[index*2+1]+self.store[index*2+2])
 
			self.store[index] = hash_block(self.store[index*2+1] + self.store[index*2+2])
 
			index = (index-1)//2
 
			
 
	## Fast construction of the tree over the leaves. O(n).
 
	def buildTree(self):
 
	def build_tree(self):
 
		print(datetime.now(), "building tree:")
 
		progress=Progress(-1, self.leafStart-1)
 
		for i in range(self.leafStart-1, -1, -1):
 
			self.store[i] = hashBlock(self.store[i*2+1]+self.store[i*2+2])
 
		progress = Progress(-1, self.leaf_start-1)
 
		for i in range(self.leaf_start-1, -1, -1):
 
			self.store[i] = hash_block(self.store[i*2+1] + self.store[i*2+2])
 
			progress.p(i)
 
		progress.done()
 

	
 
	## Update faster than repeated insertLeaf.
 
	def batchUpdate(self, keysHashes):
 
	def batch_update(self, keys_hashes):
 
		queue = collections.deque()
 
		for (k, v) in sorted(keysHashes):
 
		for (k, v) in sorted(keys_hashes):
 
			self.store[k] = v
 
			parentK = (k-1)//2
 
			if len(queue)==0 or queue[-1]!=parentK:
 
				queue.append(parentK)
 
			parent_k = (k-1)//2
 
			if len(queue)==0 or queue[-1]!=parent_k:
 
				queue.append(parent_k)
 

	
 
		while len(queue)>0:
 
			k = queue.pop()
 
			self.store[k] = hashBlock(self.store[k*2+1]+self.store[k*2+2])
 
			parentK = (k-1)//2
 
			if (len(queue)==0 or queue[0]!=parentK) and k!=0:
 
				queue.appendleft(parentK)
 
			self.store[k] = hash_block(self.store[k*2+1] + self.store[k*2+2])
 
			parent_k = (k-1)//2
 
			if (len(queue)==0 or queue[0]!=parent_k) and k!=0:
 
				queue.appendleft(parent_k)
src/morevna.py
Show inline comments
 
@@ -3,7 +3,7 @@ import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon, splitHost
 
from util import spawn_daemon, split_host
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
@@ -11,42 +11,42 @@ from client import Client, Connection as
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
def _check_file(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:", f, file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 
def build_tree(args):
 
	_check_file(args.datafile)
 
	if os.path.isfile(args.treefile):
 
		treeMod = os.stat(args.treefile).st_mtime
 
		dataMod = os.stat(args.datafile).st_mtime
 
		if dataMod<treeMod and not args.force:
 
		tree_mod = os.stat(args.treefile).st_mtime
 
		data_mod = os.stat(args.datafile).st_mtime
 
		if data_mod<tree_mod and not args.force:
 
			print("tree file is up to date")
 
			return
 

	
 
	tree = HashTree.fromFile(args.datafile)
 
	tree = HashTree.from_file(args.datafile)
 
	tree.save(args.treefile)
 

	
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts = [args.host]
 
	if args.port: conf.port = args.port
 

	
 
	c = Client(args.datafile,args.tree)
 
	for h in conf.hosts:
 
		host = splitHost(h,conf.port)
 
		host = split_host(h, conf.port)
 
		stats.reset()
 
		try:
 
			with ClientConnection(*host) as con:
 
				c.setConnection(con)
 
				c.set_connection(con)
 
				c.init("push")
 
				blocksToTransfer = c.negotiate()
 
				c.sendData(blocksToTransfer)
 
				blocks_to_transfer = c.negotiate()
 
				c.send_data(blocks_to_transfer)
 
			print()
 
			print(stats.report())
 
			print()
 
@@ -56,21 +56,22 @@ def push(args):
 
			print(e)
 
		except IncompatibleException as e: print(e)
 

	
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts = [args.host]
 
	if args.port: conf.port = args.port
 

	
 
	c = Client(args.datafile, args.tree)
 
	host = splitHost(conf.hosts[0], conf.port)
 
	host = split_host(conf.hosts[0], conf.port)
 
	try:
 
		with ClientConnection(*host) as con:
 
			c.setConnection(con)
 
			c.set_connection(con)
 
			c.init("pull")
 
			blocksToTransfer = c.negotiate()
 
			c.pullData(blocksToTransfer, args.force)
 
			blocks_to_transfer = c.negotiate()
 
			c.pull_data(blocks_to_transfer, args.force)
 
		print()
 
		print(stats.report())
 
	except FailedConnection: pass
 
@@ -80,15 +81,15 @@ def pull(args):
 

	
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts.insert(0, args.host)
 
	if args.port: conf.port = args.port
 

	
 
	try:
 
		s = Miniserver(args.datafile,args.tree)
 
		spawnDaemon(s.serve)
 
		spawn_daemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s", e)
 
		print("Failed to start:\n  ", e)
 
@@ -97,33 +98,33 @@ def serve(args):
 
parser = ArgumentParser()
 
subparsers = parser.add_subparsers()
 

	
 
pBuild = subparsers.add_parser("build")
 
pBuild.add_argument("-f", "--force", action="store_true", help="force tree rebuild")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 
p_build = subparsers.add_parser("build")
 
p_build.add_argument("-f", "--force", action="store_true", help="force tree rebuild")
 
p_build.add_argument("treefile", help="stored hash tree location")
 
p_build.add_argument("datafile")
 
p_build.set_defaults(func=build_tree)
 

	
 
pUpdate = subparsers.add_parser("push")
 
pUpdate.add_argument("-p", "--port", type=int)
 
pUpdate.add_argument("--host")
 
pUpdate.add_argument("-t", "--tree", help="stored hash tree location")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=push)
 
p_update = subparsers.add_parser("push")
 
p_update.add_argument("-p", "--port", type=int)
 
p_update.add_argument("--host")
 
p_update.add_argument("-t", "--tree", help="stored hash tree location")
 
p_update.add_argument("datafile")
 
p_update.set_defaults(func=push)
 

	
 
pUpdate = subparsers.add_parser("pull")
 
pUpdate.add_argument("-p", "--port", type=int)
 
pUpdate.add_argument("--host")
 
pUpdate.add_argument("-t", "--tree", help="stored hash tree location")
 
pUpdate.add_argument("-f", "--force", action="store_true", help="ignore lock file")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=pull)
 
p_update = subparsers.add_parser("pull")
 
p_update.add_argument("-p", "--port", type=int)
 
p_update.add_argument("--host")
 
p_update.add_argument("-t", "--tree", help="stored hash tree location")
 
p_update.add_argument("-f", "--force", action="store_true", help="ignore lock file")
 
p_update.add_argument("datafile")
 
p_update.set_defaults(func=pull)
 

	
 
pServe = subparsers.add_parser("serve")
 
pServe.add_argument("-p", "--port", type=int)
 
pServe.add_argument("--host")
 
pServe.add_argument("-t", "--tree", help="stored hash tree location")
 
pServe.add_argument("datafile")
 
pServe.set_defaults(func=serve)
 
p_serve = subparsers.add_parser("serve")
 
p_serve.add_argument("-p", "--port", type=int)
 
p_serve.add_argument("--host")
 
p_serve.add_argument("-t", "--tree", help="stored hash tree location")
 
p_serve.add_argument("datafile")
 
p_serve.set_defaults(func=serve)
 

	
 
args = parser.parse_args()
 
try: args.func(args)
src/netnode.py
Show inline comments
 
@@ -7,7 +7,7 @@ from networkers import NetworkReader,Net
 
from hashtree import HashTree
 

	
 

	
 
lockFile = os.path.join(conf.directory,"dirty.lock")
 
lock_file = os.path.join(conf.directory, "dirty.lock")
 

	
 

	
 
class FailedConnection(Exception): pass
 
@@ -21,7 +21,7 @@ class BaseConnection: # abstract
 
		self.incoming = None
 
		self.outcoming = None
 

	
 
	def createNetworkers(self):
 
	def create_networkers(self):
 
		fr = self._socket.makefile(mode="rb")
 
		fw = self._socket.makefile(mode="wb")
 

	
 
@@ -40,35 +40,35 @@ class BaseConnection: # abstract
 

	
 

	
 
class NetNode:
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		self._incoming = None
 
		self._outcoming = None
 

	
 
		self._filename = filename
 
		self._treeFile = treeFile
 
		self._tree_file = tree_file
 

	
 
		if treeFile:
 
			self._tree = HashTree.load(treeFile)
 
		if tree_file:
 
			self._tree = HashTree.load(tree_file)
 
		else:
 
			self._tree = HashTree.fromFile(filename)
 
			self._tree = HashTree.from_file(filename)
 

	
 
		self._newLeaves = dict()
 
		self._new_leaves = dict()
 

	
 
	def isLocked(self):
 
		return os.path.isfile(lockFile)
 
	def is_locked(self):
 
		return os.path.isfile(lock_file)
 

	
 
	def _lock(self):
 
		try:
 
			f = open(lockFile,"x")
 
			f = open(lock_file, "x")
 
			f.close()
 
		except FileExistsError:
 
			raise LockedException()
 

	
 
	def _unlock(self):
 
		os.remove(lockFile)
 
		os.remove(lock_file)
 

	
 
	def _updateTree(self):
 
	def _update_tree(self):
 
		log.info("updating hash tree...")
 
		self._tree.batchUpdate(self._newLeaves.items())
 
		self._tree.save(self._treeFile)
 
		self._tree.batch_update(self._new_leaves.items())
 
		self._tree.save(self._tree_file)
 
		log.info("tree updated")
src/networkers.py
Show inline comments
 
@@ -7,42 +7,42 @@ class NetworkReader:
 
	def __init__(self, stream):
 
		self._stream = stream
 

	
 
	def readMsg(self):
 
	def read_msg(self):
 
		data = self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		jsonLength = int(data.split(b":")[1].strip()) # "json-length: length" -> length
 
		stats.log_received(data)
 
		json_length = int(data.split(b":")[1].strip())  # "json-length: length" -> length
 

	
 
		data = self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		binLength = int(data.split(b":")[1].strip()) # "bin-length: length" -> length
 
		stats.log_received(data)
 
		bin_length = int(data.split(b":")[1].strip())  # "bin-length: length" -> length
 

	
 
		jsonData = self._stream.read(jsonLength)
 
		assert len(jsonData)==jsonLength
 
		stats.logReceived(jsonData)
 
		jsonData = json.loads(str(jsonData,encoding="utf-8"))
 
		json_data = self._stream.read(json_length)
 
		assert len(json_data)==json_length
 
		stats.log_received(json_data)
 
		json_data = json.loads(str(json_data, encoding="utf-8"))
 

	
 
		binData = self._stream.read(binLength)
 
		assert len(binData)==binLength
 
		stats.logReceived(binData)
 
		bin_data = self._stream.read(bin_length)
 
		assert len(bin_data)==bin_length
 
		stats.log_received(bin_data)
 
		
 
		return (jsonData, binData)
 
		return (json_data, bin_data)
 
		
 

	
 
class NetworkWriter:
 
	def __init__(self, stream):
 
		self._stream = stream
 

	
 
	def writeMsg(self, *args):
 
		msg = self.prepMsg(*args)
 
	def write_msg(self, *args):
 
		msg = self.prep_msg(*args)
 
		self._stream.write(msg)
 
		self._stream.flush()
 
		stats.logSent(msg)
 
		stats.log_sent(msg)
 

	
 
	def prepMsg(self, jsonData, binData=b""):
 
		jsonData = bytes(json.dumps(jsonData,separators=(',',':'))+"\n", encoding="utf-8")
 
		jsonLength = bytes("json-length: "+str(len(jsonData))+"\n", encoding="utf-8")
 
		binLength = bytes("bin-length: "+str(len(binData))+"\n", encoding="utf-8")
 
	def prep_msg(self, json_data, bin_data=b""):
 
		json_data = bytes(json.dumps(json_data, separators=(',', ':'))+"\n", encoding="utf-8")
 
		json_length = bytes("json-length: "+str(len(json_data))+"\n", encoding="utf-8")
 
		bin_length = bytes("bin-length: "+str(len(bin_data))+"\n", encoding="utf-8")
 

	
 
		return b"".join((jsonLength, binLength, jsonData, binData))
 
		return b"".join((json_length, bin_length, json_data, bin_data))
src/server.py
Show inline comments
 
@@ -3,7 +3,7 @@ import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from hashtree import hash_block
 
from netnode import BaseConnection,NetNode
 
import config as conf
 
import status
 
@@ -11,25 +11,25 @@ from datafile import DataFile
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self, serverSocket, sslContext):
 
	def __init__(self, server_socket, ssl_context):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		sock, address = server_socket.accept()
 
		peer = sock.getpeername()
 
		try:
 
			self._socket = sslContext.wrap_socket(sock, server_side=True)
 
			self._socket = ssl_context.wrap_socket(sock, server_side=True)
 
		except (ssl.SSLError,OSError) as e:
 
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
 
			raise e
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 
		self.create_networkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		self._filename = filename
 
		self._treeFile = treeFile
 
		self._tree_file = tree_file
 

	
 
		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
 
		self._ssl.verify_mode = ssl.CERT_REQUIRED
 
@@ -47,21 +47,21 @@ class Miniserver:
 
				except (ssl.SSLError, OSError): continue
 
				if p and p.is_alive():
 
					with connection as c:
 
						c[0].readMsg()
 
						c[1].writeMsg({"command":"deny", "status":status.locked})
 
						c[0].read_msg()
 
						c[1].write_msg({"command": "deny", "status":status.locked})
 
					continue
 
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile))
 
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self, connection, filename, treeFile=""):
 
		super().__init__(filename, treeFile)
 
	def __init__(self, connection, filename, tree_file=""):
 
		super().__init__(filename, tree_file)
 
		(self._incoming, self._outcoming) = connection
 

	
 
		self.BLOCK_SIZE = self._tree.BLOCK_SIZE
 

	
 
		self._dataFileObj = None
 
		self._data_file_obj = None
 

	
 
	@staticmethod
 
	def run(connection, *args):
 
@@ -70,88 +70,88 @@ class Server(NetNode):
 
			s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileObj:
 
			self._dataFileObj = DataFile.open(self._filename, mode="rb+")
 
		return self._dataFileObj
 
	def _data_file(self):
 
		if not self._data_file_obj:
 
			self._data_file_obj = DataFile.open(self._filename, mode="rb+")
 
		return self._data_file_obj
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
			while self._serve_one(): pass
 
		except (AssertionError, ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData, binData = self._incoming.readMsg()
 
	def _serve_one(self):
 
		json_data, bin_data = self._incoming.read_msg()
 

	
 
		if jsonData["command"]=="init":
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters})
 
			if jsonData["version"]<conf.lowestCompatible:
 
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version})
 
			if jsonData["action"]=="pull" and self.isLocked():
 
				self._outcoming.writeMsg({"command":"deny", "status":status.locked})
 
			if jsonData["action"]=="push" and not self.isLocked():
 
		if json_data["command"]=="init":
 
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
 
			if json_data["version"]<conf.lowest_compatible:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
 
			if json_data["action"]=="pull" and self.is_locked():
 
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
 
			if json_data["action"]=="push" and not self.is_locked():
 
				self._lock()
 

	
 
			self._outcoming.writeMsg({"command":"init", "version":conf.version})
 
			self._outcoming.write_msg({"command": "init", "version": conf.version})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
		elif json_data["command"]=="req":
 
			if json_data["dataType"]=="data":
 
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData, binData))
 
		elif json_data["command"]=="send" and json_data["dataType"]=="data":
 
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))
 

	
 
		elif jsonData["command"]=="end":
 
		elif json_data["command"]=="end":
 
			self._finalize()
 
			if jsonData.get("action")=="push": self._unlock()
 
			if json_data.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 
			assert False, json_data["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self, indices):
 
	def _request_hash(self, indices):
 
		log.info("received request for nodes {0}".format(indices))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes = [self._tree.store[i] for i in indices]
 

	
 
		jsonResponse = {"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse = b"".join(hashes)
 
		json_response = {"command": "send", "index": indices, "dataType": "hash"}
 
		bin_response = b"".join(hashes)
 

	
 
		return (jsonResponse, binResponse)
 
		return (json_response, bin_response)
 

	
 
	def _requestData(self, index):
 
	def _request_data(self, index):
 
		log.info("received request for data blocks {0}".format(index))
 

	
 
		jsonResponse = {"command":"send", "index":index, "dataType":"data"}
 
		json_response = {"command": "send", "index": index, "dataType": "data"}
 
		blocks = []
 
		for i in index:
 
			blocks.append(self._dataFile.readFrom(i))
 
			blocks.append(self._data_file.read_from(i))
 

	
 
		return (jsonResponse, b"".join(blocks))
 
		return (json_response, b"".join(blocks))
 

	
 
	def _receiveData(self, jsonData, binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:]))
 
	def _receive_data(self, json_data, bin_data):
 
		if not self.is_locked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
 

	
 
		indices = jsonData["index"]
 
		indices = json_data["index"]
 
		for (i, k) in enumerate(indices):
 
			block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._dataFile.writeAt(k, block)
 
			if self._treeFile:
 
				self._newLeaves[k+self._tree.leafStart] = hashBlock(block)
 
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._data_file.write_at(k, block)
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileObj = None
 
		if self._treeFile:
 
			self._updateTree()
 
		self._data_file.close()
 
		self._data_file_obj = None
 
		if self._tree_file:
 
			self._update_tree()
 
		log.info("done")
src/stats.py
Show inline comments
 
@@ -2,26 +2,27 @@ class Stats:
 
	def __init__(self):
 
		self.received = 0
 
		self.sent = 0
 
		self.exchangedNodes = 0
 
		self.transferredBlocks = 0
 
		self.exchanged_nodes = 0
 
		self.transferred_blocks = 0
 

	
 

	
 
stats = Stats()
 

	
 

	
 
def logReceived(data):
 
def log_received(data):
 
	stats.received += len(data)
 

	
 

	
 
def logSent(data):
 
def log_sent(data):
 
	stats.sent += len(data)
 

	
 

	
 
def logExchangedNode(k=1):
 
	stats.exchangedNodes += k
 
def log_exchanged_node(k=1):
 
	stats.exchanged_nodes += k
 

	
 

	
 
def logTransferredBlock(k=1):
 
	stats.transferredBlocks += k
 
def log_transferred_block(k=1):
 
	stats.transferred_blocks += k
 

	
 

	
 
def reset():
 
@@ -34,16 +35,16 @@ def report():
 
sent {sf} ({s:,} B)
 
exchanged {nodes:,} hash tree nodes
 
transferred {blocks:,} blocks""".format(
 
		rf=formatBytes(stats.received),
 
		rf=format_bytes(stats.received),
 
		r=stats.received,
 
		sf=formatBytes(stats.sent),
 
		sf=format_bytes(stats.sent),
 
		s=stats.sent,
 
		nodes=stats.exchangedNodes,
 
		blocks=stats.transferredBlocks
 
		nodes=stats.exchanged_nodes,
 
		blocks=stats.transferred_blocks
 
	)
 

	
 

	
 
def formatBytes(x):
 
def format_bytes(x):
 
	exts = ["B", "kiB", "MiB", "GiB", "TiB", "PiB"]
 
	i = 0
 
	while x>1024:
src/tests/test_hashtree.py
Show inline comments
 
@@ -8,24 +8,24 @@ from . import RedirectedOutput
 
random.seed(17)
 

	
 

	
 
def buildTree(leaves):
 
def build_tree(leaves):
 
	tree = HashTree(len(leaves))
 
	for l in leaves:
 
		tree.insertLeaf(l)
 
	tree.buildTree()
 
		tree.insert_leaf(l)
 
	tree.build_tree()
 
	return tree
 

	
 

	
 
class TestMorevna(RedirectedOutput, TestCase):
 
	def test_batchUpdate(self):
 
	def test_batch_update(self):
 
		leaves = [b"a" for i in range(8)]
 
		t1 = buildTree(leaves)
 
		t1 = build_tree(leaves)
 
		keys = list(range(8))
 

	
 
		for i in range(8):
 
			random.shuffle(keys)
 
			for k in keys[:i+1]:
 
				leaves[k] = bytes([random.randrange(256)])
 
			t2 = buildTree(leaves)
 
			t1.batchUpdate((k+t1.leafStart, leaves[k]) for k in keys[:i+1])
 
			t2 = build_tree(leaves)
 
			t1.batch_update((k + t1.leaf_start, leaves[k]) for k in keys[:i + 1])
 
			self.assertEqual(t1.store, t2.store)
src/tests/test_overall.py
Show inline comments
 
@@ -17,14 +17,14 @@ handler = FileHandler("/tmp/morevna.log"
 
handler.setFormatter(config.formatter)
 
config.logger.addHandler(handler)
 

	
 
config.batchSize.hash = 8
 
config.batchSize.data = 8
 
config.batch_size.hash = 8
 
config.batch_size.data = 8
 

	
 
dataDir = os.path.join(config.directory, "src/tests/data")
 
filename = os.path.join(dataDir, "test.img")
 
data_dir = os.path.join(config.directory, "src/tests/data")
 
filename = os.path.join(data_dir, "test.img")
 

	
 

	
 
def compareFiles(f1, f2):
 
def compare_files(f1, f2):
 
	with open(f1, mode="rb") as f:
 
		h2 = hashlib.sha256(f.read()).hexdigest()
 
	with open(f2, mode="rb") as f:
 
@@ -36,7 +36,7 @@ class TestMorevna(RedirectedOutput, Test
 
	_stdout = None
 

	
 
	def setUp(self):
 
		src = os.path.join(dataDir, "test1.img")
 
		src = os.path.join(data_dir, "test1.img")
 
		shutil.copyfile(src, filename)
 

	
 
	@classmethod
 
@@ -45,15 +45,15 @@ class TestMorevna(RedirectedOutput, Test
 
		os.remove(filename)
 

	
 
	def test_build(self):
 
		treeFile = os.path.join(dataDir, "test.bin")
 
		refFile = os.path.join(dataDir, "test1.bin")
 
		tree_file = os.path.join(data_dir, "test.bin")
 
		ref_file = os.path.join(data_dir, "test1.bin")
 

	
 
		tree = HashTree.fromFile(os.path.join(dataDir, "test1.img"))
 
		tree.save(treeFile)
 
		tree = HashTree.from_file(os.path.join(data_dir, "test1.img"))
 
		tree.save(tree_file)
 

	
 
		self.assertEqual(*compareFiles(refFile, treeFile))
 
		self.assertEqual(*compare_files(ref_file, tree_file))
 

	
 
		os.remove(treeFile)
 
		os.remove(tree_file)
 

	
 
	def test_push(self):
 
		config.port += 1
 
@@ -61,35 +61,35 @@ class TestMorevna(RedirectedOutput, Test
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		for clientFile in ("test2.img", "test3.img", "test4.img"):
 
			clientFile = os.path.join(dataDir, clientFile)
 
			c = Client(clientFile)
 
		for client_file in ("test2.img", "test3.img", "test4.img"):
 
			client_file = os.path.join(data_dir, client_file)
 
			c = Client(client_file)
 
			with ClientConnection("127.0.0.1", config.port) as con:
 
				c.setConnection(con)
 
				c.set_connection(con)
 
				c.init("push")
 
				blocksToTransfer = c.negotiate()
 
				c.sendData(blocksToTransfer)
 
				blocks_to_transfer = c.negotiate()
 
				c.send_data(blocks_to_transfer)
 

	
 
			self.assertEqual(*compareFiles(clientFile, filename))
 
			self.assertEqual(*compare_files(client_file, filename))
 

	
 
		p.terminate()
 
		p.join()
 

	
 
	def test_pull(self):
 
		config.port += 1
 
		serverFile = os.path.join(dataDir, "test3.img")
 
		ms = Miniserver(serverFile)
 
		server_file = os.path.join(data_dir, "test3.img")
 
		ms = Miniserver(server_file)
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c = Client(filename)
 
		with ClientConnection("127.0.0.1", config.port) as con:
 
			c.setConnection(con)
 
			c.set_connection(con)
 
			c.init("pull")
 
			blocksToTransfer = c.negotiate()
 
			c.pullData(blocksToTransfer)
 
			blocks_to_transfer = c.negotiate()
 
			c.pull_data(blocks_to_transfer)
 

	
 
		self.assertEqual(*compareFiles(serverFile, filename))
 
		self.assertEqual(*compare_files(server_file, filename))
 

	
 
		p.terminate()
 
		p.join()
 
@@ -100,17 +100,17 @@ class TestMorevna(RedirectedOutput, Test
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c1 = Client(os.path.join(dataDir, "test2.img"))
 
		c1 = Client(os.path.join(data_dir, "test2.img"))
 
		with ClientConnection("127.0.0.1", config.port) as con1:
 
			c1.setConnection(con1)
 
			c1.set_connection(con1)
 
			c1.init("push")
 

	
 
			c2 = Client(os.path.join(dataDir, "test3.img"))
 
			c2 = Client(os.path.join(data_dir, "test3.img"))
 
			with ClientConnection("127.0.0.1", config.port) as con2:
 
				c2.setConnection(con2)
 
				c2.set_connection(con2)
 
				with self.assertRaises(DeniedConnection):
 
					c2.init("push")
 
			c1.sendData([]) # to unlock the server
 
			c1.send_data([]) # to unlock the server
 

	
 
		p.terminate()
 
		p.join()
src/util.py
Show inline comments
 
@@ -5,7 +5,7 @@ import collections
 
from datetime import datetime
 

	
 

	
 
def spawnDaemon(fun):
 
def spawn_daemon(fun):
 
	# do the UNIX double-fork magic, see Stevens' "Advanced
 
	# Programming in the UNIX Environment" for details (ISBN 0201563177)
 
	try:
 
@@ -36,9 +36,9 @@ def spawnDaemon(fun):
 
	os._exit(os.EX_OK)
 

	
 

	
 
def splitHost(host, defaultPort=0):
 
def split_host(host, default_port=0):
 
	address, _, port = host.partition(":")
 
	if not port: port = defaultPort
 
	if not port: port = default_port
 
	return (address, port)
 

	
 

	
 
@@ -61,7 +61,7 @@ class Progress:
 
		if len(self._past)==0 or (now-self._past[-1][0]).total_seconds()>1:
 
			self._past.append((now, i))
 
		if res!=self._last or (now-self._past[0][0]).total_seconds()>5:
 
			eta = formatSeconds(self.eta(i))
 
			eta = format_seconds(self.eta(i))
 
			self._print("{0} (ETA {1})".format(res, eta))
 
			self._last = res
 
			while (now-self._past[0][0]).total_seconds()>5:
 
@@ -86,7 +86,7 @@ class Progress:
 
		print("\r"+s, end=end)
 

	
 

	
 
def formatSeconds(secs):
 
def format_seconds(secs):
 
	if math.isnan(secs): return "?"
 
	secs = round(secs)
 
	if secs<60: return "{0}s".format(secs)
0 comments (0 inline, 0 general)