Changeset - 87a9ced6e7b5
[Not reviewed]
default
0 4 0
Laman - 7 years ago 2018-01-20 00:29:18

batch push
4 files changed with 35 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -88,88 +88,100 @@ class Client(NetNode):
 
				(j1,j2)=[HashTree.HASH_LEN*ji for ji in (j,j+1)]
 
				if localTree.store[i]!=binData[j1:j2]:
 
					# ie. 0-6 nodes, 7-14 leaves. 2*6+2<15
 
					if 2*i+2<len(localTree.store): # inner node
 
						frontier.append(2*i+1)
 
						frontier.append(2*i+2)
 
					else:
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size=stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			jsonData={"command":"send", "index":i2, "dataType":"data"}
 

	
 
		for k in range(0,len(blocksToTransfer),conf.batchSize):
 
			indices=[]
 
			blocks=[]
 
			for j in range(conf.batchSize):
 
				if k+j>=len(blocksToTransfer): break
 
				i2=blocksToTransfer[k+j]
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			binData=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
				block=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
			self._outcoming.writeMsg(jsonData,binData)
 
			stats.logTransferredBlock()
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="ack" and jsonData["index"]==i2, jsonData
 
				indices.append(i2)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i2,block[:5],block[-5:]))
 

	
 
			i1=i2
 
			progress.p(k)
 
				progress.p(k+j)
 
			if indices: self._sendData(indices,blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end","action":"push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer,ignoreLock=False):
 
		if not ignoreLock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			dataFile.write(binData)
 

	
 
			if self._treeFile:
 
				self._newLeaves[i2+self._tree.leafStart]=hashBlock(binData)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 

	
 
			stats.logTransferredBlock()
 
			i1=i2
 
			progress.p(k)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 

	
 
	def _sendData(self,indices,blocks):
 
		jsonData={"command":"send", "index":indices, "dataType":"data"}
 
		binData=b"".join(blocks)
 
		self._outcoming.writeMsg(jsonData,binData)
 
		stats.logTransferredBlock(len(indices))
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/config.py
Show inline comments
 
import os
 
import json
 
import logging as log
 
from logging.handlers import TimedRotatingFileHandler
 

	
 

	
 
logger=log.getLogger()
 
logger.setLevel(log.INFO)
 
formatter=log.Formatter("%(asctime)s %(levelname)s: %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
 
handler=TimedRotatingFileHandler("/var/log/morevna/mor.log",when="midnight",backupCount=9)
 
handler.setFormatter(formatter)
 
logger.addHandler(handler)
 

	
 
directory=os.path.join(os.path.dirname(__file__),"..")
 
certfile=os.path.join(directory,"certs/cert.pem")
 
keyfile=os.path.join(directory,"certs/key.pem")
 
peers=os.path.join(directory,"certs/peers.pem")
 

	
 
configFile=os.path.join(directory,"config.json")
 
conf=dict()
 
if os.path.isfile(configFile):
 
	with open(configFile) as f: conf=json.load(f)
 

	
 
version=[0,0,1]
 
lowestCompatible=[0,0,0] # tuple is more fitting but json conversion transforms it into a list anyway
 
version=[0,1,0]
 
lowestCompatible=[0,1,0] # tuple is more fitting but json conversion transforms it into a list anyway
 

	
 
hosts=conf.get("hosts",["127.0.0.1"])
 
port=conf.get("port",9901)
 

	
 
batchSize=conf.get("batchSize",256)
 
batchSize=conf.get("batchSize",64)
src/server.py
Show inline comments
 
@@ -112,41 +112,42 @@ class Server(NetNode):
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _receiveData(self,jsonData,binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		i=jsonData["index"]
 
		if self._lastIndex+1!=i:
 
			self._dataFile.seek(i*self.BLOCK_SIZE)
 
		self._dataFile.write(binData)
 
		self._lastIndex=i
 
		indices=jsonData["index"]
 
		for (i,k) in enumerate(indices):
 
			if self._lastIndex+1!=k:
 
				self._dataFile.seek(k*self.BLOCK_SIZE)
 
			self._dataFile.write(binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE])
 
			self._lastIndex=k
 
		if self._treeFile:
 
			self._newLeaves[i+self._tree.leafStart]=hashBlock(binData)
 
				self._newLeaves[k+self._tree.leafStart]=hashBlock(binData)
 

	
 
		return ({"command": "ack", "index": i},)
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		if self._treeFile:
 
			self._updateTree()
 
		log.info("done")
src/stats.py
Show inline comments
 
class Stats:
 
	def __init__(self):
 
		self.received=0
 
		self.sent=0
 
		self.exchangedNodes=0
 
		self.transferredBlocks=0
 

	
 
stats=Stats()
 

	
 

	
 
def logReceived(data):
 
	stats.received+=len(data)
 

	
 

	
 
def logSent(data):
 
	stats.sent+=len(data)
 

	
 

	
 
def logExchangedNode(k=1):
 
	stats.exchangedNodes+=k
 

	
 

	
 
def logTransferredBlock():
 
	stats.transferredBlocks+=1
 
def logTransferredBlock(k=1):
 
	stats.transferredBlocks+=k
 

	
 

	
 
def reset():
 
	global stats
 
	stats=Stats()
 

	
 

	
 
def report():
 
	return """received {rf} ({r:,} B)
 
sent {sf} ({s:,} B)
 
exchanged {nodes:,} hash tree nodes
 
transferred {blocks:,} blocks""".format(rf=formatBytes(stats.received), r=stats.received, sf=formatBytes(stats.sent), s=stats.sent, nodes=stats.exchangedNodes, blocks=stats.transferredBlocks)
 

	
 

	
 
def formatBytes(x):
 
	exts=["B","kiB","MiB","GiB","TiB","PiB"]
 
	i=0
 
	while x>1024:
 
		x/=1024
 
		i+=1
 
	if x>=100: x=round(x)
 
	elif x>=10: x=round(x,1)
 
	else: x=round(x,2)
 
	return "{0} {1}".format(x,exts[i])
0 comments (0 inline, 0 general)