Changeset - bb3b53ee15d6
[Not reviewed]
default
0 3 1
Laman - 7 years ago 2018-01-21 00:42:05

batch pull
4 files changed with 50 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
import collections
 
import socket
 
import ssl
 
import logging as log
 
from datetime import datetime
 

	
 
import config as conf
 
import status
 
import stats
 
from util import Progress
 
from hashtree import HashTree,hashBlock
 
from netnode import BaseConnection,NetNode,FailedConnection,LockedException,IncompatibleException
 
from datafile import DataFile
 

	
 

	
 
class DeniedConnection(Exception): pass
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,host,port):
 
		super().__init__()
 
		sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 

	
 
		sslContext=ssl.create_default_context(cafile=conf.peers)
 
		sslContext.check_hostname=False
 
		sslContext.load_cert_chain(conf.certfile,conf.keyfile)
 

	
 
		self._socket=sslContext.wrap_socket(sock)
 

	
 
		try:
 
			self._socket.connect((host,port))
 
		except (ConnectionRefusedError,OSError) as e:
 
			log.exception(e)
 
			print("Couldn't connect to {0}:{1}".format(host,port))
 
			raise FailedConnection()
 
		except ssl.SSLError as e:
 
			log.exception(e)
 
@@ -121,67 +122,66 @@ class Client(NetNode):
 
				block=dataFile.read(HashTree.BLOCK_SIZE)
 

	
 
				indices.append(i2)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i2,block[:5],block[-5:]))
 

	
 
				i1=i2
 
				progress.p(k+j)
 
			if indices: self._sendData(indices,blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end","action":"push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer,ignoreLock=False):
 
		if not ignoreLock:
 
			try:
 
				self._lock()
 
			except LockedException:
 
				print("The file is locked. Either (a) there's another pull going on (then wait or kill it), or (b) a previous pull ended prematurely and the file is probably corrupt (then repeat pull with -f for force).")
 
				return
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb+")
 
		i1=-1
 
		dataFile=DataFile.open(self._filename, mode="rb+")
 

	
 
		print(datetime.now(), "receiving data:")
 
		progress=Progress(len(blocksToTransfer))
 
		for (k,i2) in enumerate(blocksToTransfer):
 
			self._outcoming.writeMsg({"command":"req", "index":i2, "dataType":"data"})
 

	
 
		for k in range(0,len(blocksToTransfer),conf.batchSize):
 
			indices=blocksToTransfer[k:k+conf.batchSize]
 
			self._outcoming.writeMsg({"command":"req", "index":indices, "dataType":"data"})
 
			jsonData,binData=self._incoming.readMsg()
 
			assert jsonData["command"]=="send" and jsonData["index"]==i2 and jsonData["dataType"]=="data", jsonData
 

	
 
			if i1+1!=i2:
 
				dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
			dataFile.write(binData)
 
			assert jsonData["command"]=="send" and jsonData["index"]==indices and jsonData["dataType"]=="data", jsonData
 
			for (j,i) in enumerate(indices):
 
				block=binData[j*HashTree.BLOCK_SIZE:(j+1)*HashTree.BLOCK_SIZE]
 
				dataFile.writeAt(i,block)
 

	
 
			if self._treeFile:
 
				self._newLeaves[i2+self._tree.leafStart]=hashBlock(binData)
 
					self._newLeaves[i+self._tree.leafStart]=hashBlock(block)
 

	
 
			log.info("block #{0}: {1}...{2}".format(i2,binData[:5],binData[-5:]))
 
				log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:]))
 

	
 
			stats.logTransferredBlock()
 
			i1=i2
 
			progress.p(k)
 
				progress.p(k+j)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 
		self._unlock()
 

	
 
		if self._treeFile:
 
			self._updateTree()
 

	
 
	def _sendData(self,indices,blocks):
 
		jsonData={"command":"send", "index":indices, "dataType":"data"}
 
		binData=b"".join(blocks)
 
		self._outcoming.writeMsg(jsonData,binData)
 
		stats.logTransferredBlock(len(indices))
 
		jsonData,binData=self._incoming.readMsg()
 
		assert jsonData["command"]=="ack" and jsonData["index"]==indices, jsonData
 

	
 
	def setConnection(self,connection):
 
		(self._incoming,self._outcoming)=connection
src/datafile.py
Show inline comments
 
new file 100644
 
from hashtree import HashTree
 

	
 

	
 
BLOCK_SIZE=HashTree.BLOCK_SIZE
 

	
 

	
 
class DataFile:
 
	def __init__(self,fileHandle):
 
		self._lastIndex=0
 
		self._f=fileHandle
 

	
 
	@staticmethod
 
	def open(filename,mode="rb"):
 
		return DataFile(open(filename,mode=mode))
 

	
 
	def writeAt(self,i,blockData):
 
		if i!=self._lastIndex+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._f.write(blockData)
 
		self._lastIndex=i
 

	
 
	def readFrom(self,i,byteCount=BLOCK_SIZE):
 
		if i!=self._lastIndex+1:
 
			self._f.seek(i*BLOCK_SIZE)
 
		self._lastIndex=i
 
		return self._f.read(byteCount)
 

	
 
	def close(self):
 
		self._f.close()
src/morevna.py
Show inline comments
 
import sys
 
import os.path
 
import logging as log
 
from argparse import ArgumentParser
 

	
 
from util import spawnDaemon, splitHost
 
import config as conf
 
import stats
 
from hashtree import HashTree
 
from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection
 
from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection, IncompatibleException
 
from server import Miniserver
 

	
 

	
 
def _checkFile(f):
 
	if not os.path.isfile(f):
 
		print("invalid file specified:",f,file=sys.stderr)
 
		sys.exit(1)
 

	
 

	
 
def buildTree(args):
 
	_checkFile(args.datafile)
 
	if os.path.isfile(args.treefile):
 
		treeMod=os.stat(args.treefile).st_mtime
 
		dataMod=os.stat(args.datafile).st_mtime
 
		if dataMod<treeMod and not args.force:
 
			print("tree file is up to date")
 
			return
 

	
 
	tree=HashTree.fromFile(args.datafile)
 
	tree.save(args.treefile)
 

	
 
def push(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	for h in conf.hosts:
 
		host=splitHost(h,conf.port)
 
		stats.reset()
 
		try:
 
			with ClientConnection(*host) as con:
 
				c.setConnection(con)
 
				c.init("push")
 
				blocksToTransfer=c.negotiate()
 
				c.sendData(blocksToTransfer)
 
			print()
 
			print(stats.report())
 
			print()
 
		except FailedConnection: pass
 
		except DeniedConnection as e:
 
			print("Server {0}:{1} denied connection.".format(*host))
 
			print(e)
 
		except IncompatibleException as e: print(e)
 

	
 
def pull(args):
 
	_checkFile(args.datafile)
 
	if args.tree:
 
		_checkFile(args.tree)
 
	if args.host: conf.hosts=[args.host]
 
	if args.port: conf.port=args.port
 

	
 
	c=Client(args.datafile,args.tree)
 
	host=splitHost(conf.hosts[0],conf.port)
 
	try:
 
		with ClientConnection(*host) as con:
 
			c.setConnection(con)
 
			c.init("pull")
 
			blocksToTransfer=c.negotiate()
 
			c.pullData(blocksToTransfer,args.force)
 
		print()
 
		print(stats.report())
 
	except FailedConnection: pass
 
	except DeniedConnection as e:
 
		print("Server {0}:{1} denied connection.".format(*host))
 
		print(e)
 

	
 
def serve(args):
src/server.py
Show inline comments
 
@@ -102,52 +102,54 @@ class Server(NetNode):
 
		elif jsonData["command"]=="end":
 
			self._finalize()
 
			if jsonData.get("action")=="push": self._unlock()
 
			return False
 

	
 
		else:
 
			assert False, jsonData["command"]
 

	
 
		return True
 

	
 
	def _requestHash(self,indices):
 
		log.info("received request for nodes #{0}".format(",".join(str(i) for i in indices)))
 
		assert all(i<len(self._tree.store) for i in indices)
 
		hashes=[self._tree.store[i] for i in indices]
 

	
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		if self._lastIndex+1!=index:
 
			self._dataFile.seek(index*self.BLOCK_SIZE)
 
		binResponse=self._dataFile.read(self.BLOCK_SIZE)
 
		blocks=[]
 
		for i in index:
 
			if self._lastIndex+1!=i:
 
				self._dataFile.seek(i*self.BLOCK_SIZE)
 
			blocks.append(self._dataFile.read(self.BLOCK_SIZE))
 

	
 
		return (jsonResponse,binResponse)
 
		return (jsonResponse,b"".join(blocks))
 

	
 
	def _receiveData(self,jsonData,binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		indices=jsonData["index"]
 
		for (i,k) in enumerate(indices):
 
			if self._lastIndex+1!=k:
 
				self._dataFile.seek(k*self.BLOCK_SIZE)
 
			self._dataFile.write(binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE])
 
			self._lastIndex=k
 
			if self._treeFile:
 
				self._newLeaves[k+self._tree.leafStart]=hashBlock(binData)
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		if self._treeFile:
 
			self._updateTree()
 
		log.info("done")
0 comments (0 inline, 0 general)