Changeset - 6a0ab4fe9f5e
[Not reviewed]
default
0 13 0
Laman - 5 years ago 2020-05-24 15:51:35

changed naming to snake case. sadly no love for camel case in this world
13 files changed with 367 insertions and 364 deletions:
0 comments (0 inline, 0 general)
src/benchmark.py
Show inline comments
 
from time import time
 
import socket
 
import threading
 

	
 
from hashtree import HashTree
 

	
 

	
 
def timeF(f):
 
def time_f(f):
 
	start = time()
 
	f()
 
	end = time()
 
	print((end-start), "s")
 

	
 

	
 
def fullRead():
 
def full_read():
 
	block = True
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
		while block:
 
			block = f.read(HashTree.BLOCK_SIZE)
 

	
 

	
 
def selectedRead():
 
def selected_read():
 
	with open("/home/laman/blocks.txt") as f:
 
		blocks = [int(x) for x in f]
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
		i1 = -1
 
		for i2 in blocks:
 
			if i1+1!=i2:
 
				f.seek(i2*HashTree.BLOCK_SIZE)
 
			block = f.read(HashTree.BLOCK_SIZE)
 
			i1 = i2
 

	
 

	
 
def lessSelectedRead():
 
def less_selected_read():
 
	with open("/home/laman/blocks.txt") as f:
 
		blocks = [int(x) for x in f]
 
	with open("/home/laman/ext2.img", mode="rb") as f:
 
		i1 = -1
 
		for i2 in blocks:
 
			if i2<=i1+8:
 
				block = f.read(HashTree.BLOCK_SIZE*(i2-i1))
 
			else:
 
				f.seek(i2*HashTree.BLOCK_SIZE)
 
				block = f.read(HashTree.BLOCK_SIZE)
 
			i1 = i2
 

	
 

	
 
def shortSockets():
 
def short_sockets():
 
	def _server():
 
		serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		serverSock.bind(("", 12329))
 
		serverSock.listen(1)
 
		server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		server_sock.bind(("", 12329))
 
		server_sock.listen(1)
 

	
 
		for i in range(10000):
 
			sock, address = serverSock.accept()
 
			sock, address = server_sock.accept()
 
			with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
				fr.readline()
 
			sock.shutdown(socket.SHUT_RDWR)
 
			sock.close()
 
		serverSock.close()
 
		server_sock.close()
 

	
 
	def _client():
 
		for i in range(10000):
 
			sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
			sock.connect(("127.0.0.1", 12329))
 
			with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
				fw.write(b"x"*4096+b"\n")
 
				fw.flush()
 
			sock.shutdown(socket.SHUT_RDWR)
 
			sock.close()
 

	
 
	s = threading.Thread(target=_server)
 
	s.start()
 
	c = threading.Thread(target=_client)
 
	c.start()
 
	s.join()
 
	c.join()
 

	
 

	
 
def longSockets():
 
def long_sockets():
 
	def _server():
 
		serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		serverSock.bind(("", 12330))
 
		serverSock.listen(1)
 
		sock, address = serverSock.accept()
 
		server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		server_sock.bind(("", 12330))
 
		server_sock.listen(1)
 
		sock, address = server_sock.accept()
 

	
 
		with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
			for i in range(10000):
 
				fr.readline()
 

	
 
		sock.shutdown(socket.SHUT_RDWR)
 
		sock.close()
 
		serverSock.close()
 
		server_sock.close()
 

	
 
	def _client():
 
		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		sock.connect(("127.0.0.1", 12330))
 
		with sock.makefile(mode="rb") as fr, sock.makefile(mode="wb") as fw:
 
			for i in range(10000):
 
					fw.write(b"x"*4096+b"\n")
 
					fw.flush()
 
				fw.write(b"x"*4096+b"\n")
 
				fw.flush()
 
		sock.shutdown(socket.SHUT_RDWR)
 
		sock.close()
 

	
 
	s = threading.Thread(target=_server)
 
	s.start()
 
	c = threading.Thread(target=_client)
 
	c.start()
 
	s.join()
 
	c.join()
 

	
 

	
 
# timeF(fullRead) # 85.40341448783875 s
 
# timeF(selectedRead) # 6.774365186691284 s
 
# timeF(lessSelectedRead) # 5.930811405181885 s
 
# time_f(full_read) # 85.40341448783875 s
 
# time_f(selected_read) # 6.774365186691284 s
 
# time_f(less_selected_read) # 5.930811405181885 s
 

	
 
# timeF(shortSockets) # 3.928339719772339 s
 
# timeF(longSockets) # 0.15576839447021484 s
 
# time_f(short_sockets) # 3.928339719772339 s
 
# time_f(long_sockets) # 0.15576839447021484 s
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import status
 
import stats
 
from util import Progress
 
from hashtree import HashTree, hashBlock
 
from hashtree import HashTree, hash_block
 
from netnode import BaseConnection, NetNode, FailedConnection, LockedException, IncompatibleException
 
from datafile import DataFile
 

	
 

	
 
class DeniedConnection(Exception): pass
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self, host, port):
 
		super().__init__()
 
		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext = ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname = False
 
		sslContext.load_cert_chain(conf.certfile, conf.keyfile)
 
		ssl_context = ssl.create_default_context(cafile=conf.peers)
 
		ssl_context.check_hostname = False
 
		ssl_context.load_cert_chain(conf.certfile, conf.keyfile)
 

	
 
		self._socket = sslContext.wrap_socket(sock)
 
		self._socket = ssl_context.wrap_socket(sock)
 

	
 
		try:
 
			self._socket.connect((host, port))
 
		except (ConnectionRefusedError, OSError) as e:
 
			log.exception(e)
 
			print("Couldn't connect to {0}:{1}".format(host, port))
 
			raise FailedConnection()
 
		except ssl.SSLError as e:
 
			log.exception(e)
 
			print("Error creating SSL connection to {0}:{1}".format(host, port))
 
			raise FailedConnection()
 

	
 
		self.createNetworkers()
 
		self.create_networkers()
 
		print("Connected to {0}".format(host))
 

	
 

	
 
class Client(NetNode):
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		print(datetime.now(), "initializing...")
 
		super().__init__(filename, treeFile)
 
		super().__init__(filename, tree_file)
 

	
 
	def init(self, action):
 
		jsonData = {
 
		json_data = {
 
			"command": "init",
 
			"blockSize": self._tree.BLOCK_SIZE,
 
			"blockCount": self._tree.leafCount,
 
			"blockCount": self._tree.leaf_count,
 
			"version": conf.version,
 
			"action": action
 
		}
 
		self._outcoming.writeMsg(jsonData)
 
		jsonData, binData = self._incoming.readMsg()
 
		if jsonData["command"]=="deny":
 
			if jsonData["status"]==status.incompatible.version:
 
		self._outcoming.write_msg(json_data)
 
		json_data, bin_data = self._incoming.read_msg()
 
		if json_data["command"] == "deny":
 
			if json_data["status"] == status.incompatible.version:
 
				raise DeniedConnection("Incompatible client version. Consider upgrading it.")
 
			raise DeniedConnection()
 
		assert jsonData["command"]=="init"
 
		if jsonData["version"]<conf.lowestCompatible:
 
		assert json_data["command"] == "init"
 
		if json_data["version"] < conf.lowest_compatible:
 
			raise IncompatibleException("Incompatible server version. Consider upgrading it.")
 

	
 
	## Asks server for node hashes to determine which are to be transferred.
 
	#
 
	# Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2.
 
	#
 
	# Requests nodes in order of a batch DFS. Needs stack of size O(treeDepth*batchSize). Nodes in each tree level are accessed in order.
 
	# Requests nodes in order of a batch DFS. Needs a stack of size O(tree_depth*batch_size). Nodes in each tree level are accessed in order.
 
	def negotiate(self):
 
		localTree = self._tree
 
		blocksToTransfer = []
 
		nodeStack = collections.deque([0])  # root
 
		local_tree = self._tree
 
		blocks_to_transfer = []
 
		node_stack = collections.deque([0])  # root
 

	
 
		# determine which blocks to send
 
		print(datetime.now(), "negotiating:")
 
		progress = Progress(localTree.leafCount)
 
		while len(nodeStack)>0:
 
		progress = Progress(local_tree.leaf_count)
 
		while len(node_stack) > 0:
 
			indices = []
 
			for i in range(conf.batchSize.hash):
 
				indices.append(nodeStack.pop())
 
				if len(nodeStack)==0: break
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"hash"})
 
			for i in range(conf.batch_size.hash):
 
				indices.append(node_stack.pop())
 
				if len(node_stack) == 0: break
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "hash"})
 

	
 
			jsonData, binData = self._incoming.readMsg()
 
			assert jsonData["index"]==indices
 
			assert jsonData["dataType"]=="hash"
 
			stats.logExchangedNode(len(indices))
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["index"] == indices
 
			assert json_data["dataType"] == "hash"
 
			stats.log_exchanged_node(len(indices))
 

	
 
			frontier = []
 
			for (j, i) in enumerate(indices):
 
				(j1, j2) = [HashTree.HASH_LEN*ji for ji in (j, j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
				if local_tree.store[i] != bin_data[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2<len(localTree.store): # inner node
 
					if 2*i+2 < len(local_tree.store):  # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
						blocks_to_transfer.append(i-local_tree.leaf_start)  # leaf
 
						progress.p(i-local_tree.leaf_start)
 
			node_stack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size = stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		size = stats.format_bytes(len(blocks_to_transfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 
		return blocks_to_transfer
 

	
 
	def sendData(self, blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile = DataFile.open(self._filename)
 
	def send_data(self, blocks_to_transfer):
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename)
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		for k in range(0, len(blocksToTransfer), conf.batchSize.data):
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = []
 
			blocks = []
 
			for j in range(conf.batchSize.data):
 
				if k+j>=len(blocksToTransfer): break
 
				i = blocksToTransfer[k+j]
 
				block = dataFile.readFrom(i)
 
			for j in range(conf.batch_size.data):
 
				if k+j >= len(blocks_to_transfer): break
 
				i = blocks_to_transfer[k+j]
 
				block = data_file.read_from(i)
 

	
 
				indices.append(i)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 

	
 
				progress.p(k+j)
 
			if indices: self._sendData(indices, blocks)
 
			if indices: self._send_data(indices, blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end", "action":"push"})
 
		self._outcoming.write_msg({"command": "end", "action": "push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		data_file.close()
 

	
 
	def pullData(self, blocksToTransfer, ignoreLock=False):
 
		if not ignoreLock:
 
	def pull_data(self, blocks_to_transfer, ignore_lock=False):
 
		if not ignore_lock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				print(
 
					"The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocksToTransfer)
 
		dataFile = DataFile.open(self._filename, mode="rb+")
 
		log.info(blocks_to_transfer)
 
		data_file = DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress = Progress(len(blocksToTransfer))
 
		progress = Progress(len(blocks_to_transfer))
 

	
 
		for k in range(0, len(blocksToTransfer), conf.batchSize.data):
 
			indices = blocksToTransfer[k:k+conf.batchSize.data]
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"})
 
			jsonData, binData = self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData
 
		for k in range(0, len(blocks_to_transfer), conf.batch_size.data):
 
			indices = blocks_to_transfer[k:k+conf.batch_size.data]
 
			self._outcoming.write_msg({"command": "req", "index": indices, "dataType": "data"})
 
			json_data, bin_data = self._incoming.read_msg()
 
			assert json_data["command"]=="send" and json_data["index"]==indices and json_data["dataType"]=="data", json_data
 
			for (j, i) in enumerate(indices):
 
				block = binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				dataFile.writeAt(i, block)
 
				block = bin_data[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				data_file.write_at(i, block)
 

	
 
				if self._treeFile:
 
					self._newLeaves[i+self._tree.leafStart] = hashBlock(block)
 
				if self._tree_file:
 
					self._new_leaves[i+self._tree.leaf_start] = hash_block(block)
 

	
 
				log.info("block #{0}: {1}...{2}".format(i, block[:5], block[-5:]))
 

	
 
				stats.logTransferredBlock()
 
				stats.log_transferred_block()
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 
		self._outcoming.write_msg({"command": "end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		data_file.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 
		if self._tree_file:
 
			self._update_tree()
 

	
 
	def _sendData(self, indices, blocks):
 
		jsonData = {"command":"send", "index":indices, "dataType":"data"}
 
		binData = b"".join(blocks)
 
		self._outcoming.writeMsg(jsonData, binData)
 
		stats.logTransferredBlock(len(indices))
 
		jsonData, binData = self._incoming.readMsg()
 
		assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData
 
	def _send_data(self, indices, blocks):
 
		json_data = {"command": "send", "index": indices, "dataType": "data"}
 
		bin_data = b"".join(blocks)
 
		self._outcoming.write_msg(json_data, bin_data)
 
		stats.log_transferred_block(len(indices))
 
		json_data, bin_data = self._incoming.read_msg()
 
		assert json_data["command"]=="ack" and json_data["index"]==indices, json_data
 

	
 
	def setConnection(self, connection):
 
	def set_connection(self, connection):
 
		(self._incoming, self._outcoming) = connection
src/config.py
Show inline comments
 
import os
 
import json
 
import logging as log
 
from logging.handlers import TimedRotatingFileHandler
 

	
 

	
 
directory = os.path.join(os.path.dirname(__file__), "..")
 
configFile = os.path.join(directory, "config.json")
 
config_file = os.path.join(directory, "config.json")
 
conf = dict()
 
if os.path.isfile(configFile):
 
	with open(configFile) as f: conf = json.load(f)
 
if os.path.isfile(config_file):
 
	with open(config_file) as f: conf = json.load(f)
 

	
 
logFile = conf.get("logFile", "/var/log/morevna/mor.log")
 
log_file = conf.get("logFile", "/var/log/morevna/mor.log")
 

	
 
logger = log.getLogger()
 
logger.setLevel(log.INFO)
 
formatter = log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler = TimedRotatingFileHandler(logFile, when="midnight", backupCount=9)
 
handler = TimedRotatingFileHandler(log_file, when="midnight", backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
certfile = os.path.join(directory,"certs/cert.pem")
 
keyfile = os.path.join(directory,"certs/key.pem")
 
peers = os.path.join(directory,"certs/peers.pem")
 

	
 
version = [0, 1, 1]
 
lowestCompatible=[0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 
lowest_compatible = [0, 1, 0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts=conf.get("hosts", ["127.0.0.1"])
 
port=conf.get("port", 9901)
 
hosts = conf.get("hosts", ["127.0.0.1"])
 
port = conf.get("port", 9901)
 

	
 
bSize=conf.get("batchSize", dict())
 
class batchSize:
 
	hash=bSize.get("hash", 256)
 
	data=bSize.get("data", 64)
 
bSize = conf.get("batch_size", dict())
 
class batch_size:
 
	hash = bSize.get("hash", 256)
 
	data = bSize.get("data", 64)
src/datafile.py
Show inline comments
 
from hashtree import HashTree
 

	
 

	
 
BLOCK_SIZE = HashTree.BLOCK_SIZE
 

	
 

	
 
class DataFile:
 
	def __init__(self, fileHandle):
 
		self._lastIndex = 0
 
		self._f = fileHandle
 
	def __init__(self, file_handle):
 
		self._last_index = 0
 
		self._f = file_handle
 

	
 
	@staticmethod
 
	def open(filename, mode="rb"):
 
		return DataFile(open(filename, mode=mode))
 

	
 
	def writeAt(self, i, blockData):
 
		if i!=self._lastIndex+1:
 
	def write_at(self, i, block_data):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._f.write(blockData)
 
		self._lastIndex = i
 
		self._f.write(block_data)
 
		self._last_index = i
 

	
 
	def readFrom(self, i, byteCount=BLOCK_SIZE):
 
		if i!=self._lastIndex+1:
 
	def read_from(self, i, byte_count=BLOCK_SIZE):
 
		if i!=self._last_index+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._lastIndex = i
 
		return self._f.read(byteCount)
 
		self._last_index = i
 
		return self._f.read(byte_count)
 

	
 
	def close(self):
 
		self._f.close()
src/hashtree.py
Show inline comments
 
import collections
 
import hashlib
 
import os
 
from datetime import datetime
 

	
 
from util import Progress
 

	
 

	
 
def hashBlock(data):
 
def hash_block(data):
 
	return hashlib.sha256(data).digest()[-HashTree.HASH_LEN:]
 

	
 

	
 
class HashTree:
 
	HASH_LEN = 16 # bytes
 
	BLOCK_SIZE = 4096 # bytes
 
	HASH_LEN = 16  # bytes
 
	BLOCK_SIZE = 4096  # bytes
 
	
 
	## Prepares a tree containing leafCount leaves.
 
	def __init__(self, leafCount):
 
		self.store = [b""]*(leafCount*2-1)
 
		self.leafStart = leafCount-1
 
		self.leafCount = leafCount
 
		self._index = self.leafStart
 
	## Prepares a tree containing leaf_count leaves.
 
	def __init__(self, leaf_count):
 
		self.store = [b""]*(leaf_count*2-1)
 
		self.leaf_start = leaf_count-1
 
		self.leaf_count = leaf_count
 
		self._index = self.leaf_start
 
		
 
	@classmethod
 
	def fromFile(cls, filename):
 
	def from_file(cls, filename):
 
		with open(filename, "rb") as f:
 
			stat = os.fstat(f.fileno())
 
			size = stat.st_size # !! symlinks
 
			leafCount = (size-1)//HashTree.BLOCK_SIZE+1 # number of leaf blocks
 
			res = cls(leafCount)
 
			size = stat.st_size  # !! symlinks
 
			leaf_count = (size-1)//HashTree.BLOCK_SIZE+1  # number of leaf blocks
 
			res = cls(leaf_count)
 
			print(datetime.now(), "hashing file:")
 

	
 
			progress = Progress(leafCount)
 
			for i in range(leafCount):
 
			progress = Progress(leaf_count)
 
			for i in range(leaf_count):
 
				data = f.read(HashTree.BLOCK_SIZE)
 
				res.insertLeaf(hashBlock(data))
 
				res.insert_leaf(hash_block(data))
 

	
 
				progress.p(i)
 
			progress.done()
 
		res.buildTree()
 
		res.build_tree()
 
		
 
		return res
 

	
 
	@classmethod
 
	def load(cls, filename):
 
		with open(filename, "rb") as f:
 
			stat = os.fstat(f.fileno())
 
			size = stat.st_size
 
			nodeCount = size//HashTree.HASH_LEN
 
			res = cls((nodeCount+1)//2)
 
			node_count = size//HashTree.HASH_LEN
 
			res = cls((node_count+1)//2)
 

	
 
			for i in range(nodeCount):
 
			for i in range(node_count):
 
				res.store[i] = f.read(HashTree.HASH_LEN)
 
		return res
 

	
 
	def save(self, filename):
 
		with open(filename, "wb") as f:
 
			for h in self.store:
 
				f.write(h)
 
		
 
	## Inserts a leaf at the first empty position.
 
	#
 
	#	Useful and used only during the tree construction.
 
	def insertLeaf(self,h):
 
	def insert_leaf(self, h):
 
		self.store[self._index] = h
 
		self._index += 1
 
		
 
	## Updates a hash stored in the leaf.
 
	def updateLeaf(self, index, h):
 
		if index<self.leafStart: raise IndexError()
 
	def update_leaf(self, index, h):
 
		if index<self.leaf_start: raise IndexError()
 
		
 
		self.store[index] = h
 
		self.updateNode((index-1)//2)
 
		self.update_node((index-1)//2)
 
	
 
	## Updates the node at index and all its ancestors.
 
	def updateNode(self, index):
 
	def update_node(self, index):
 
		while index>=0:
 
			self.store[index] = hashBlock(self.store[index*2+1]+self.store[index*2+2])
 
			self.store[index] = hash_block(self.store[index*2+1] + self.store[index*2+2])
 
			index = (index-1)//2
 
			
 
	## Fast construction of the tree over the leaves. O(n).
 
	def buildTree(self):
 
	def build_tree(self):
 
		print(datetime.now(), "building tree:")
 
		progress=Progress(-1, self.leafStart-1)
 
		for i in range(self.leafStart-1, -1, -1):
 
			self.store[i] = hashBlock(self.store[i*2+1]+self.store[i*2+2])
 
		progress = Progress(-1, self.leaf_start-1)
 
		for i in range(self.leaf_start-1, -1, -1):
 
			self.store[i] = hash_block(self.store[i*2+1] + self.store[i*2+2])
 
			progress.p(i)
 
		progress.done()
 

	
 
	## Update faster than repeated insertLeaf.
 
	def batchUpdate(self, keysHashes):
 
	def batch_update(self, keys_hashes):
 
		queue = collections.deque()
 
		for (k, v) in sorted(keysHashes):
 
		for (k, v) in sorted(keys_hashes):
 
			self.store[k] = v
 
			parentK = (k-1)//2
 
			if len(queue)==0 or queue[-1]!=parentK:
 
				queue.append(parentK)
 
			parent_k = (k-1)//2
 
			if len(queue)==0 or queue[-1]!=parent_k:
 
				queue.append(parent_k)
 

	
 
		while len(queue)>0:
 
			k = queue.pop()
 
			self.store[k] = hashBlock(self.store[k*2+1]+self.store[k*2+2])
 
			parentK = (k-1)//2
 
			if (len(queue)==0 or queue[0]!=parentK) and k!=0:
 
				queue.appendleft(parentK)
 
			self.store[k] = hash_block(self.store[k*2+1] + self.store[k*2+2])
 
			parent_k = (k-1)//2
 
			if (len(queue)==0 or queue[0]!=parent_k) and k!=0:
 
				queue.appendleft(parent_k)
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon, splitHost
 
from util import spawn_daemon, split_host
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection, IncompatibleException
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
def _check_file(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:", f, file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 
def build_tree(args):
 
	_check_file(args.datafile)
 
	if os.path.isfile(args.treefile):
 
		treeMod = os.stat(args.treefile).st_mtime
 
		dataMod = os.stat(args.datafile).st_mtime
 
		if dataMod<treeMod and not args.force:
 
		tree_mod = os.stat(args.treefile).st_mtime
 
		data_mod = os.stat(args.datafile).st_mtime
 
		if data_mod<tree_mod and not args.force:
 
			print("tree file is up to date")
 
			return
 

	
 
	tree = HashTree.fromFile(args.datafile)
 
	tree = HashTree.from_file(args.datafile)
 
	tree.save(args.treefile)
 

	
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts = [args.host]
 
	if args.port: conf.port = args.port
 

	
 
	c = Client(args.datafile,args.tree)
 
	c = Client(args.datafile, args.tree)
 
	for h in conf.hosts:
 
		host = splitHost(h,conf.port)
 
		host = split_host(h, conf.port)
 
		stats.reset()
 
		try:
 
			with ClientConnection(*host) as con:
 
				c.setConnection(con)
 
				c.set_connection(con)
 
				c.init("push")
 
				blocksToTransfer = c.negotiate()
 
				c.sendData(blocksToTransfer)
 
				blocks_to_transfer = c.negotiate()
 
				c.send_data(blocks_to_transfer)
 
			print()
 
			print(stats.report())
 
			print()
 
		except FailedConnection: pass
 
		except DeniedConnection as e:
 
			print("Server {0}:{1} denied connection.".format(*host))
 
			print(e)
 
		except IncompatibleException as e: print(e)
 

	
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts = [args.host]
 
	if args.port: conf.port = args.port
 

	
 
	c = Client(args.datafile, args.tree)
 
	host = splitHost(conf.hosts[0], conf.port)
 
	host = split_host(conf.hosts[0], conf.port)
 
	try:
 
		with ClientConnection(*host) as con:
 
			c.setConnection(con)
 
			c.set_connection(con)
 
			c.init("pull")
 
			blocksToTransfer = c.negotiate()
 
			c.pullData(blocksToTransfer, args.force)
 
			blocks_to_transfer = c.negotiate()
 
			c.pull_data(blocks_to_transfer, args.force)
 
		print()
 
		print(stats.report())
 
	except FailedConnection: pass
 
	except DeniedConnection as e:
 
		print("Server {0}:{1} denied connection.".format(*host))
 
		print(e)
 

	
 

	
 
def serve(args):
 
	_checkFile(args.datafile)
 
	_check_file(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
		_check_file(args.tree)
 
	if args.host: conf.hosts.insert(0, args.host)
 
	if args.port: conf.port = args.port
 

	
 
	try:
 
		s = Miniserver(args.datafile,args.tree)
 
		spawnDaemon(s.serve)
 
		s = Miniserver(args.datafile, args.tree)
 
		spawn_daemon(s.serve)
 
	except Exception as e:
 
		log.exception("exception: %s", e)
 
		print("Failed to start:\n  ", e)
 

	
 

	
 
parser = ArgumentParser()
 
subparsers = parser.add_subparsers()
 

	
 
pBuild = subparsers.add_parser("build")
 
pBuild.add_argument("-f", "--force", action="store_true", help="force tree rebuild")
 
pBuild.add_argument("treefile", help="stored hash tree location")
 
pBuild.add_argument("datafile")
 
pBuild.set_defaults(func=buildTree)
 
p_build = subparsers.add_parser("build")
 
p_build.add_argument("-f", "--force", action="store_true", help="force tree rebuild")
 
p_build.add_argument("treefile", help="stored hash tree location")
 
p_build.add_argument("datafile")
 
p_build.set_defaults(func=build_tree)
 

	
 
pUpdate = subparsers.add_parser("push")
 
pUpdate.add_argument("-p", "--port", type=int)
 
pUpdate.add_argument("--host")
 
pUpdate.add_argument("-t", "--tree", help="stored hash tree location")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=push)
 
p_update = subparsers.add_parser("push")
 
p_update.add_argument("-p", "--port", type=int)
 
p_update.add_argument("--host")
 
p_update.add_argument("-t", "--tree", help="stored hash tree location")
 
p_update.add_argument("datafile")
 
p_update.set_defaults(func=push)
 

	
 
pUpdate = subparsers.add_parser("pull")
 
pUpdate.add_argument("-p", "--port", type=int)
 
pUpdate.add_argument("--host")
 
pUpdate.add_argument("-t", "--tree", help="stored hash tree location")
 
pUpdate.add_argument("-f", "--force", action="store_true", help="ignore lock file")
 
pUpdate.add_argument("datafile")
 
pUpdate.set_defaults(func=pull)
 
p_update = subparsers.add_parser("pull")
 
p_update.add_argument("-p", "--port", type=int)
 
p_update.add_argument("--host")
 
p_update.add_argument("-t", "--tree", help="stored hash tree location")
 
p_update.add_argument("-f", "--force", action="store_true", help="ignore lock file")
 
p_update.add_argument("datafile")
 
p_update.set_defaults(func=pull)
 

	
 
pServe = subparsers.add_parser("serve")
 
pServe.add_argument("-p", "--port", type=int)
 
pServe.add_argument("--host")
 
pServe.add_argument("-t", "--tree", help="stored hash tree location")
 
pServe.add_argument("datafile")
 
pServe.set_defaults(func=serve)
 
p_serve = subparsers.add_parser("serve")
 
p_serve.add_argument("-p", "--port", type=int)
 
p_serve.add_argument("--host")
 
p_serve.add_argument("-t", "--tree", help="stored hash tree location")
 
p_serve.add_argument("datafile")
 
p_serve.set_defaults(func=serve)
 

	
 
args = parser.parse_args()
 
try: args.func(args)
 
except AttributeError:
 
	parser.print_help()
src/netnode.py
Show inline comments
 
import os
 
import socket
 
import logging as log
 

	
 
import config as conf
 
from networkers import NetworkReader,NetworkWriter
 
from networkers import NetworkReader, NetworkWriter
 
from hashtree import HashTree
 

	
 

	
 
lockFile = os.path.join(conf.directory,"dirty.lock")
 
lock_file = os.path.join(conf.directory, "dirty.lock")
 

	
 

	
 
class FailedConnection(Exception): pass
 
class LockedException(Exception): pass
 
class IncompatibleException(Exception): pass
 

	
 

	
 
class BaseConnection: # abstract
 
class BaseConnection:  # abstract
 
	def __init__(self):
 
		self._socket = None
 
		self.incoming = None
 
		self.outcoming = None
 

	
 
	def createNetworkers(self):
 
	def create_networkers(self):
 
		fr = self._socket.makefile(mode="rb")
 
		fw = self._socket.makefile(mode="wb")
 

	
 
		self.incoming = NetworkReader(fr)
 
		self.outcoming = NetworkWriter(fw)
 

	
 
	def __enter__(self):
 
		return self.incoming, self.outcoming
 

	
 
	def __exit__(self, exc_type, exc_val, exc_tb):
 
		try:
 
			self._socket.shutdown(socket.SHUT_RDWR)
 
			self._socket.close()
 
		except OSError:
 
			log.warning("broken connection")
 

	
 

	
 
class NetNode:
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		self._incoming = None
 
		self._outcoming = None
 

	
 
		self._filename = filename
 
		self._treeFile = treeFile
 
		self._tree_file = tree_file
 

	
 
		if treeFile:
 
			self._tree = HashTree.load(treeFile)
 
		if tree_file:
 
			self._tree = HashTree.load(tree_file)
 
		else:
 
			self._tree = HashTree.fromFile(filename)
 
			self._tree = HashTree.from_file(filename)
 

	
 
		self._newLeaves = dict()
 
		self._new_leaves = dict()
 

	
 
	def isLocked(self):
 
		return os.path.isfile(lockFile)
 
	def is_locked(self):
 
		return os.path.isfile(lock_file)
 

	
 
	def _lock(self):
 
		try:
 
			f = open(lockFile,"x")
 
			f = open(lock_file, "x")
 
			f.close()
 
		except FileExistsError:
 
			raise LockedException()
 

	
 
	def _unlock(self):
 
		os.remove(lockFile)
 
		os.remove(lock_file)
 

	
 
	def _updateTree(self):
 
	def _update_tree(self):
 
		log.info("updating hash tree...")
 
		self._tree.batchUpdate(self._newLeaves.items())
 
		self._tree.save(self._treeFile)
 
		self._tree.batch_update(self._new_leaves.items())
 
		self._tree.save(self._tree_file)
 
		log.info("tree updated")
src/networkers.py
Show inline comments
 
import json
 

	
 
import stats
 

	
 

	
 
class NetworkReader:
 
	def __init__(self, stream):
 
		self._stream = stream
 

	
 
	def readMsg(self):
 
	def read_msg(self):
 
		data = self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		jsonLength = int(data.split(b":")[1].strip()) # "json-length: length" -> length
 
		stats.log_received(data)
 
		json_length = int(data.split(b":")[1].strip())  # "json-length: length" -> length
 

	
 
		data = self._stream.readline()
 
		assert data
 
		stats.logReceived(data)
 
		binLength = int(data.split(b":")[1].strip()) # "bin-length: length" -> length
 
		stats.log_received(data)
 
		bin_length = int(data.split(b":")[1].strip())  # "bin-length: length" -> length
 

	
 
		jsonData = self._stream.read(jsonLength)
 
		assert len(jsonData)==jsonLength
 
		stats.logReceived(jsonData)
 
		jsonData = json.loads(str(jsonData,encoding="utf-8"))
 
		json_data = self._stream.read(json_length)
 
		assert len(json_data)==json_length
 
		stats.log_received(json_data)
 
		json_data = json.loads(str(json_data, encoding="utf-8"))
 

	
 
		binData = self._stream.read(binLength)
 
		assert len(binData)==binLength
 
		stats.logReceived(binData)
 
		bin_data = self._stream.read(bin_length)
 
		assert len(bin_data)==bin_length
 
		stats.log_received(bin_data)
 
		
 
		return (jsonData, binData)
 
		return (json_data, bin_data)
 
		
 

	
 
class NetworkWriter:
 
	def __init__(self, stream):
 
		self._stream = stream
 

	
 
	def writeMsg(self, *args):
 
		msg = self.prepMsg(*args)
 
	def write_msg(self, *args):
 
		msg = self.prep_msg(*args)
 
		self._stream.write(msg)
 
		self._stream.flush()
 
		stats.logSent(msg)
 
		stats.log_sent(msg)
 

	
 
	def prepMsg(self, jsonData, binData=b""):
 
		jsonData = bytes(json.dumps(jsonData,separators=(',',':'))+"\n", encoding="utf-8")
 
		jsonLength = bytes("json-length: "+str(len(jsonData))+"\n", encoding="utf-8")
 
		binLength = bytes("bin-length: "+str(len(binData))+"\n", encoding="utf-8")
 
	def prep_msg(self, json_data, bin_data=b""):
 
		json_data = bytes(json.dumps(json_data, separators=(',', ':'))+"\n", encoding="utf-8")
 
		json_length = bytes("json-length: "+str(len(json_data))+"\n", encoding="utf-8")
 
		bin_length = bytes("bin-length: "+str(len(bin_data))+"\n", encoding="utf-8")
 

	
 
		return b"".join((jsonLength, binLength, jsonData, binData))
 
		return b"".join((json_length, bin_length, json_data, bin_data))
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from hashtree import hash_block
 
from netnode import BaseConnection,NetNode
 
import config as conf
 
import status
 
from datafile import DataFile
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self, serverSocket, sslContext):
 
	def __init__(self, server_socket, ssl_context):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		sock, address = server_socket.accept()
 
		peer = sock.getpeername()
 
		try:
 
			self._socket = sslContext.wrap_socket(sock, server_side=True)
 
			self._socket = ssl_context.wrap_socket(sock, server_side=True)
 
		except (ssl.SSLError,OSError) as e:
 
			log.warning("Failed to establish an SSL connection from {0}.".format(peer))
 
			raise e
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 
		self.create_networkers()
 

	
 

	
 
class Miniserver:
 
	def __init__(self, filename, treeFile=""):
 
	def __init__(self, filename, tree_file=""):
 
		self._filename = filename
 
		self._treeFile = treeFile
 
		self._tree_file = tree_file
 

	
 
		self._ssl = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile=conf.peers)
 
		self._ssl.verify_mode = ssl.CERT_REQUIRED
 
		self._ssl.load_cert_chain(conf.certfile, conf.keyfile)
 

	
 
		self._ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
		self._ss.bind(("", conf.port))
 
		self._ss.listen(1)
 

	
 
	def serve(self):
 
		p = None
 
		with self._ss:
 
			while True:
 
				try: connection = Connection(self._ss, self._ssl)
 
				except (ssl.SSLError, OSError): continue
 
				if p and p.is_alive():
 
					with connection as c:
 
						c[0].readMsg()
 
						c[1].writeMsg({"command":"deny", "status":status.locked})
 
						c[0].read_msg()
 
						c[1].write_msg({"command": "deny", "status":status.locked})
 
					continue
 
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._treeFile))
 
				p = multiprocessing.Process(target=Server.run, args=(connection, self._filename, self._tree_file))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self, connection, filename, treeFile=""):
 
		super().__init__(filename, treeFile)
 
	def __init__(self, connection, filename, tree_file=""):
 
		super().__init__(filename, tree_file)
 
		(self._incoming, self._outcoming) = connection
 

	
 
		self.BLOCK_SIZE = self._tree.BLOCK_SIZE
 

	
 
		self._dataFileObj = None
 
		self._data_file_obj = None
 

	
 
	@staticmethod
 
	def run(connection, *args):
 
		with connection as c:
 
			s = Server(c,*args)
 
			s = Server(c, *args)
 
			s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileObj:
 
			self._dataFileObj = DataFile.open(self._filename, mode="rb+")
 
		return self._dataFileObj
 
	def _data_file(self):
 
		if not self._data_file_obj:
 
			self._data_file_obj = DataFile.open(self._filename, mode="rb+")
 
		return self._data_file_obj
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
			while self._serve_one(): pass
 
		except (AssertionError, ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData, binData = self._incoming.readMsg()
 
	def _serve_one(self):
 
		json_data, bin_data = self._incoming.read_msg()
 

	
 
		if jsonData["command"]=="init":
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.parameters})
 
			if jsonData["version"]<conf.lowestCompatible:
 
				self._outcoming.writeMsg({"command":"deny", "status":status.incompatible.version})
 
			if jsonData["action"]=="pull" and self.isLocked():
 
				self._outcoming.writeMsg({"command":"deny", "status":status.locked})
 
			if jsonData["action"]=="push" and not self.isLocked():
 
		if json_data["command"]=="init":
 
			if json_data["blockSize"]!=self.BLOCK_SIZE or json_data["blockCount"]!=self._tree.leaf_count:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.parameters})
 
			if json_data["version"]<conf.lowest_compatible:
 
				self._outcoming.write_msg({"command": "deny", "status": status.incompatible.version})
 
			if json_data["action"]=="pull" and self.is_locked():
 
				self._outcoming.write_msg({"command": "deny", "status": status.locked})
 
			if json_data["action"]=="push" and not self.is_locked():
 
				self._lock()
 

	
 
			self._outcoming.writeMsg({"command":"init", "version":conf.version})
 
			self._outcoming.write_msg({"command": "init", "version": conf.version})
 

	
 
		elif jsonData["command"]=="req":
 
			if jsonData["dataType"]=="data":
 
				self._outcoming.writeMsg(*self._requestData(jsonData["index"]))
 
		elif json_data["command"]=="req":
 
			if json_data["dataType"]=="data":
 
				self._outcoming.write_msg(*self._request_data(json_data["index"]))
 
			else:
 
				self._outcoming.writeMsg(*self._requestHash(jsonData["index"]))
 
				self._outcoming.write_msg(*self._request_hash(json_data["index"]))
 

	
 
		elif jsonData["command"]=="send" and jsonData["dataType"]=="data":
 
			self._outcoming.writeMsg(*self._receiveData(jsonData, binData))
 
		elif json_data["command"]=="send" and json_data["dataType"]=="data":
 
			self._outcoming.write_msg(*self._receive_data(json_data, bin_data))
 

	
 
		elif jsonData["command"]=="end":
 
		elif json_data["command"]=="end":
 
			self._finalize()
 
			if jsonData.get("action")=="push": self._unlock()
 
			if json_data.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 
			assert False, json_data["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self, indices):
 
	def _request_hash(self, indices):
 
		log.info("received request for nodes {0}".format(indices))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes = [self._tree.store[i] for i in indices]
 

	
 
		jsonResponse = {"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse = b"".join(hashes)
 
		json_response = {"command": "send", "index": indices, "dataType": "hash"}
 
		bin_response = b"".join(hashes)
 

	
 
		return (jsonResponse, binResponse)
 
		return (json_response, bin_response)
 

	
 
	def _requestData(self, index):
 
	def _request_data(self, index):
 
		log.info("received request for data blocks {0}".format(index))
 

	
 
		jsonResponse = {"command":"send", "index":index, "dataType":"data"}
 
		json_response = {"command": "send", "index": index, "dataType": "data"}
 
		blocks = []
 
		for i in index:
 
			blocks.append(self._dataFile.readFrom(i))
 
			blocks.append(self._data_file.read_from(i))
 

	
 
		return (jsonResponse, b"".join(blocks))
 
		return (json_response, b"".join(blocks))
 

	
 
	def _receiveData(self, jsonData, binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(jsonData["index"], binData[:5], binData[-5:]))
 
	def _receive_data(self, json_data, bin_data):
 
		if not self.is_locked(): self._lock()
 
		log.info("received data blocks {0}: {1}...{2}".format(json_data["index"], bin_data[:5], bin_data[-5:]))
 

	
 
		indices = jsonData["index"]
 
		indices = json_data["index"]
 
		for (i, k) in enumerate(indices):
 
			block = binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._dataFile.writeAt(k, block)
 
			if self._treeFile:
 
				self._newLeaves[k+self._tree.leafStart] = hashBlock(block)
 
			block = bin_data[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._data_file.write_at(k, block)
 
			if self._tree_file:
 
				self._new_leaves[k+self._tree.leaf_start] = hash_block(block)
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileObj = None
 
		if self._treeFile:
 
			self._updateTree()
 
		self._data_file.close()
 
		self._data_file_obj = None
 
		if self._tree_file:
 
			self._update_tree()
 
		log.info("done")
src/stats.py
Show inline comments
 
class Stats:
 
	def __init__(self):
 
		self.received = 0
 
		self.sent = 0
 
		self.exchangedNodes = 0
 
		self.transferredBlocks = 0
 
		self.exchanged_nodes = 0
 
		self.transferred_blocks = 0
 

	
 

	
 
stats = Stats()
 

	
 

	
 
def logReceived(data):
 
def log_received(data):
 
	stats.received += len(data)
 

	
 

	
 
def logSent(data):
 
def log_sent(data):
 
	stats.sent += len(data)
 

	
 

	
 
def logExchangedNode(k=1):
 
	stats.exchangedNodes += k
 
def log_exchanged_node(k=1):
 
	stats.exchanged_nodes += k
 

	
 

	
 
def logTransferredBlock(k=1):
 
	stats.transferredBlocks += k
 
def log_transferred_block(k=1):
 
	stats.transferred_blocks += k
 

	
 

	
 
def reset():
 
	global stats
 
	stats = Stats()
 

	
 

	
 
def report():
 
	return """received {rf} ({r:,} B)
 
sent {sf} ({s:,} B)
 
exchanged {nodes:,} hash tree nodes
 
transferred {blocks:,} blocks""".format(
 
		rf=formatBytes(stats.received),
 
		rf=format_bytes(stats.received),
 
		r=stats.received,
 
		sf=formatBytes(stats.sent),
 
		sf=format_bytes(stats.sent),
 
		s=stats.sent,
 
		nodes=stats.exchangedNodes,
 
		blocks=stats.transferredBlocks
 
		nodes=stats.exchanged_nodes,
 
		blocks=stats.transferred_blocks
 
	)
 

	
 

	
 
def formatBytes(x):
 
def format_bytes(x):
 
	exts = ["B", "kiB", "MiB", "GiB", "TiB", "PiB"]
 
	i = 0
 
	while x>1024:
 
		x /= 1024
 
		i += 1
 
	if x>=100: x = round(x)
 
	elif x>=10: x = round(x, 1)
 
	else: x = round(x, 2)
 
	return "{0} {1}".format(x, exts[i])
src/tests/test_hashtree.py
Show inline comments
 
import random
 
from unittest import TestCase
 

	
 
from hashtree import HashTree
 
from . import RedirectedOutput
 

	
 

	
 
random.seed(17)
 

	
 

	
 
def buildTree(leaves):
 
def build_tree(leaves):
 
	tree = HashTree(len(leaves))
 
	for l in leaves:
 
		tree.insertLeaf(l)
 
	tree.buildTree()
 
		tree.insert_leaf(l)
 
	tree.build_tree()
 
	return tree
 

	
 

	
 
class TestMorevna(RedirectedOutput, TestCase):
 
	def test_batchUpdate(self):
 
	def test_batch_update(self):
 
		leaves = [b"a" for i in range(8)]
 
		t1 = buildTree(leaves)
 
		t1 = build_tree(leaves)
 
		keys = list(range(8))
 

	
 
		for i in range(8):
 
			random.shuffle(keys)
 
			for k in keys[:i+1]:
 
				leaves[k] = bytes([random.randrange(256)])
 
			t2 = buildTree(leaves)
 
			t1.batchUpdate((k+t1.leafStart, leaves[k]) for k in keys[:i+1])
 
			t2 = build_tree(leaves)
 
			t1.batch_update((k + t1.leaf_start, leaves[k]) for k in keys[:i + 1])
 
			self.assertEqual(t1.store, t2.store)
src/tests/test_overall.py
Show inline comments
 
@@ -8,109 +8,109 @@ from unittest import TestCase
 
import config
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection, DeniedConnection
 
from server import Miniserver
 
from . import RedirectedOutput
 

	
 

	
 
config.logger.removeHandler(config.handler)
 
handler = FileHandler("/tmp/morevna.log")
 
handler.setFormatter(config.formatter)
 
config.logger.addHandler(handler)
 

	
 
config.batchSize.hash = 8
 
config.batchSize.data = 8
 
config.batch_size.hash = 8
 
config.batch_size.data = 8
 

	
 
dataDir = os.path.join(config.directory, "src/tests/data")
 
filename = os.path.join(dataDir, "test.img")
 
data_dir = os.path.join(config.directory, "src/tests/data")
 
filename = os.path.join(data_dir, "test.img")
 

	
 

	
 
def compareFiles(f1, f2):
 
def compare_files(f1, f2):
 
	with open(f1, mode="rb") as f:
 
		h2 = hashlib.sha256(f.read()).hexdigest()
 
	with open(f2, mode="rb") as f:
 
		h = hashlib.sha256(f.read()).hexdigest()
 
	return (h, h2)
 

	
 

	
 
class TestMorevna(RedirectedOutput, TestCase):
 
	_stdout = None
 

	
 
	def setUp(self):
 
		src = os.path.join(dataDir, "test1.img")
 
		src = os.path.join(data_dir, "test1.img")
 
		shutil.copyfile(src, filename)
 

	
 
	@classmethod
 
	def tearDownClass(cls):
 
		super().tearDownClass()
 
		os.remove(filename)
 

	
 
	def test_build(self):
 
		treeFile = os.path.join(dataDir, "test.bin")
 
		refFile = os.path.join(dataDir, "test1.bin")
 
		tree_file = os.path.join(data_dir, "test.bin")
 
		ref_file = os.path.join(data_dir, "test1.bin")
 

	
 
		tree = HashTree.fromFile(os.path.join(dataDir, "test1.img"))
 
		tree.save(treeFile)
 
		tree = HashTree.from_file(os.path.join(data_dir, "test1.img"))
 
		tree.save(tree_file)
 

	
 
		self.assertEqual(*compareFiles(refFile, treeFile))
 
		self.assertEqual(*compare_files(ref_file, tree_file))
 

	
 
		os.remove(treeFile)
 
		os.remove(tree_file)
 

	
 
	def test_push(self):
 
		config.port += 1
 
		ms = Miniserver(filename)
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		for clientFile in ("test2.img", "test3.img", "test4.img"):
 
			clientFile = os.path.join(dataDir, clientFile)
 
			c = Client(clientFile)
 
		for client_file in ("test2.img", "test3.img", "test4.img"):
 
			client_file = os.path.join(data_dir, client_file)
 
			c = Client(client_file)
 
			with ClientConnection("127.0.0.1", config.port) as con:
 
				c.setConnection(con)
 
				c.set_connection(con)
 
				c.init("push")
 
				blocksToTransfer = c.negotiate()
 
				c.sendData(blocksToTransfer)
 
				blocks_to_transfer = c.negotiate()
 
				c.send_data(blocks_to_transfer)
 

	
 
			self.assertEqual(*compareFiles(clientFile, filename))
 
			self.assertEqual(*compare_files(client_file, filename))
 

	
 
		p.terminate()
 
		p.join()
 

	
 
	def test_pull(self):
 
		config.port += 1
 
		serverFile = os.path.join(dataDir, "test3.img")
 
		ms = Miniserver(serverFile)
 
		server_file = os.path.join(data_dir, "test3.img")
 
		ms = Miniserver(server_file)
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c = Client(filename)
 
		with ClientConnection("127.0.0.1", config.port) as con:
 
			c.setConnection(con)
 
			c.set_connection(con)
 
			c.init("pull")
 
			blocksToTransfer = c.negotiate()
 
			c.pullData(blocksToTransfer)
 
			blocks_to_transfer = c.negotiate()
 
			c.pull_data(blocks_to_transfer)
 

	
 
		self.assertEqual(*compareFiles(serverFile, filename))
 
		self.assertEqual(*compare_files(server_file, filename))
 

	
 
		p.terminate()
 
		p.join()
 

	
 
	def test_deny(self):
 
		config.port += 1
 
		ms = Miniserver(filename)
 
		p = multiprocessing.Process(target=ms.serve)
 
		p.start()
 

	
 
		c1 = Client(os.path.join(dataDir, "test2.img"))
 
		c1 = Client(os.path.join(data_dir, "test2.img"))
 
		with ClientConnection("127.0.0.1", config.port) as con1:
 
			c1.setConnection(con1)
 
			c1.set_connection(con1)
 
			c1.init("push")
 

	
 
			c2 = Client(os.path.join(dataDir, "test3.img"))
 
			c2 = Client(os.path.join(data_dir, "test3.img"))
 
			with ClientConnection("127.0.0.1", config.port) as con2:
 
				c2.setConnection(con2)
 
				c2.set_connection(con2)
 
				with self.assertRaises(DeniedConnection):
 
					c2.init("push")
 
			c1.sendData([]) # to unlock the server
 
			c1.send_data([]) # to unlock the server
 

	
 
		p.terminate()
 
		p.join()
src/util.py
Show inline comments
 
import math
 
import os
 
import sys
 
import collections
 
from datetime import datetime
 

	
 

	
 
def spawnDaemon(fun):
 
def spawn_daemon(fun):
 
	# do the UNIX double-fork magic, see Stevens' "Advanced
 
	# Programming in the UNIX Environment" for details (ISBN 0201563177)
 
	try:
 
		pid = os.fork()
 
		if pid > 0:
 
			# parent process, return and keep running
 
			return
 
	except OSError as e:
 
		print("fork #1 failed: {0} ({1})".format(e.errno,e.strerror),file=sys.stderr)
 
		print("fork #1 failed: {0} ({1})".format(e.errno, e.strerror), file=sys.stderr)
 
		sys.exit(1)
 

	
 
	os.setsid()
 

	
 
	# do second fork
 
	try:
 
		pid = os.fork()
 
		if pid > 0:
 
			# exit from second parent
 
			print("[{0}] server running".format(pid))
 
			sys.exit(0)
 
	except OSError as e:
 
		print("fork #2 failed: {0} ({1})".format(e.errno, e.strerror), file=sys.stderr)
 
		sys.exit(1)
 

	
 
	fun()
 

	
 
	# all done
 
	os._exit(os.EX_OK)
 

	
 

	
 
def splitHost(host, defaultPort=0):
 
def split_host(host, default_port=0):
 
	address, _, port = host.partition(":")
 
	if not port: port = defaultPort
 
	if not port: port = default_port
 
	return (address, port)
 

	
 

	
 
class Progress:
 
	def __init__(self, n, i0=0):
 
		self._n = n
 
		self._i0 = i0
 
		self._i = i0
 
		self._last = ""
 
		self._past = collections.deque()
 

	
 
	def p(self, i):
 
		i0 = self._i0
 
		n = self._n
 
		now = datetime.now()
 

	
 
		assert i0<=i<n or n<i<=i0, (i0, i, n)
 
		percentage = Progress._p(i, n, i0)
 
		res = "{0}%".format(percentage)
 
		if len(self._past)==0 or (now-self._past[-1][0]).total_seconds()>1:
 
			self._past.append((now, i))
 
		if res!=self._last or (now-self._past[0][0]).total_seconds()>5:
 
			eta = formatSeconds(self.eta(i))
 
			eta = format_seconds(self.eta(i))
 
			self._print("{0} (ETA {1})".format(res, eta))
 
			self._last = res
 
			while (now-self._past[0][0]).total_seconds()>5:
 
				self._past.popleft()
 

	
 
	def done(self):
 
		self._print("100%", end="\n")
 

	
 
	def eta(self, i2):
 
		t2 = datetime.now()
 
		(t1, i1) = self._past[0]
 
		if i2==i1: return float("nan")
 
		return (self._n-i2)/(i2-i1)*(t2-t1).total_seconds()
 

	
 
	@staticmethod
 
	def _p(i, n, i0):
 
		_1=1 if n>=i0 else -1
 
		return 100*(i+_1-i0)//(n-i0)
 

	
 
	def _print(self, s, end=""):
 
		print("\r"+" "*80, end="")
 
		print("\r"+s, end=end)
 

	
 

	
 
def formatSeconds(secs):
 
def format_seconds(secs):
 
	if math.isnan(secs): return "?"
 
	secs = round(secs)
 
	if secs<60: return "{0}s".format(secs)
 
	mins = secs//60
 
	secs %= 60
 
	if mins<60: return "{0:02}:{1:02}".format(mins, secs)
 
	hours = mins//60
 
	mins %= 60
 
	return "{0:02}:{1:02}:{2:02}".format(hours, mins, secs)
 

	
 

	
 
if __name__=="__main__":
0 comments (0 inline, 0 general)