Changeset - a4bfaabaeabb
[Not reviewed]
default
0 2 0
Laman - 7 years ago 2018-01-25 02:14:02

refactored data file optimizations from client and server to datafile
2 files changed with 13 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
@@ -96,45 +96,41 @@ class Client(NetNode):
 
						blocksToTransfer.append(i-localTree.leafStart) # leaf
 
						progress.p(i-localTree.leafStart)
 
			nodeStack.extend(reversed(frontier))
 
		progress.done()
 

	
 
		size=stats.formatBytes(len(blocksToTransfer)*self._tree.BLOCK_SIZE)
 
		print(datetime.now(), "{0} to transfer".format(size))
 

	
 
		return blocksToTransfer
 

	
 
	def sendData(self,blocksToTransfer):
 
		log.info(blocksToTransfer)
 
		dataFile=open(self._filename, mode="rb")
 
		i1=-1
 
		dataFile=DataFile.open(self._filename)
 

	
 
		print(datetime.now(), "sending data:")
 
		progress=Progress(len(blocksToTransfer))
 

	
 
		for k in range(0,len(blocksToTransfer),conf.batchSize.data):
 
			indices=[]
 
			blocks=[]
 
			for j in range(conf.batchSize.data):
 
				if k+j>=len(blocksToTransfer): break
 
				i2=blocksToTransfer[k+j]
 
				if i1+1!=i2:
 
					dataFile.seek(i2*HashTree.BLOCK_SIZE)
 
				block=dataFile.read(HashTree.BLOCK_SIZE)
 
				i=blocksToTransfer[k+j]
 
				block=dataFile.readFrom(i)
 

	
 
				indices.append(i2)
 
				indices.append(i)
 
				blocks.append(block)
 
				log.info("block #{0}: {1}...{2}".format(i2,block[:5],block[-5:]))
 
				log.info("block #{0}: {1}...{2}".format(i,block[:5],block[-5:]))
 

	
 
				i1=i2
 
				progress.p(k+j)
 
			if indices: self._sendData(indices,blocks)
 
		progress.done()
 

	
 
		self._outcoming.writeMsg({"command":"end","action":"push"})
 

	
 
		log.info("closing session...")
 
		dataFile.close()
 

	
 
	def pullData(self,blocksToTransfer,ignoreLock=False):
 
		if not ignoreLock:
 
			try:
src/server.py
Show inline comments
 
import socket
 
import ssl
 
import multiprocessing
 
import logging as log
 

	
 
from hashtree import hashBlock
 
from netnode import BaseConnection,NetNode
 
import config as conf
 
import status
 
from datafile import DataFile
 

	
 

	
 
class Connection(BaseConnection):
 
	def __init__(self,serverSocket,sslContext):
 
		super().__init__()
 

	
 
		sock, address = serverSocket.accept()
 
		self._socket=sslContext.wrap_socket(sock,server_side=True)
 

	
 
		log.info('Connected by {0}'.format(address))
 
		self.createNetworkers()
 

	
 
@@ -45,38 +46,37 @@ class Miniserver:
 
					continue
 
				p=multiprocessing.Process(target=Server.run,args=(connection,self._filename,self._treeFile))
 
				p.start()
 

	
 

	
 
class Server(NetNode):
 
	def __init__(self,connection,filename,treeFile=""):
 
		super().__init__(filename,treeFile)
 
		(self._incoming,self._outcoming)=connection
 

	
 
		self.BLOCK_SIZE=self._tree.BLOCK_SIZE
 

	
 
		self._lastIndex=-1
 
		self._dataFileHandle=None
 
		self._dataFileObj=None
 

	
 
	@staticmethod
 
	def run(connection,*args):
 
		with connection as c:
 
			s=Server(c,*args)
 
			s.serve()
 

	
 
	@property
 
	def _dataFile(self):
 
		if not self._dataFileHandle:
 
			self._dataFileHandle=open(self._filename, mode="rb+")
 
		return self._dataFileHandle
 
		if not self._dataFileObj:
 
			self._dataFileObj=DataFile.open(self._filename, mode="rb+")
 
		return self._dataFileObj
 

	
 
	def serve(self):
 
		try:
 
			while self._serveOne(): pass
 
		except (AssertionError,ConnectionResetError) as e:
 
			log.warning(e)
 

	
 
	def _serveOne(self):
 
		jsonData,binData=self._incoming.readMsg()
 

	
 
		if jsonData["command"]=="init":
 
			if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount:
 
@@ -116,41 +116,36 @@ class Server(NetNode):
 

	
 
		jsonResponse={"command":"send", "index":indices, "dataType":"hash"}
 
		binResponse=b"".join(hashes)
 

	
 
		return (jsonResponse,binResponse)
 

	
 
	def _requestData(self,index):
 
		log.info("received request for data block #{0}".format(index))
 

	
 
		jsonResponse={"command":"send", "index":index, "dataType":"data"}
 
		blocks=[]
 
		for i in index:
 
			if self._lastIndex+1!=i:
 
				self._dataFile.seek(i*self.BLOCK_SIZE)
 
			blocks.append(self._dataFile.read(self.BLOCK_SIZE))
 
			blocks.append(self._dataFile.readFrom(i))
 

	
 
		return (jsonResponse,b"".join(blocks))
 

	
 
	def _receiveData(self,jsonData,binData):
 
		if not self.isLocked(): self._lock()
 
		log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:]))
 

	
 
		indices=jsonData["index"]
 
		for (i,k) in enumerate(indices):
 
			if self._lastIndex+1!=k:
 
				self._dataFile.seek(k*self.BLOCK_SIZE)
 
			block=binData[i*self.BLOCK_SIZE:(i+1)*self.BLOCK_SIZE]
 
			self._dataFile.write(block)
 
			self._lastIndex=k
 
			self._dataFile.writeAt(k,block)
 
			if self._treeFile:
 
				self._newLeaves[k+self._tree.leafStart]=hashBlock(block)
 

	
 
		return ({"command": "ack", "index": indices},)
 

	
 
	def _finalize(self):
 
		log.info("closing session...")
 
		self._dataFile.close()
 
		self._dataFileHandle=None
 
		self._dataFileObj=None
 
		if self._treeFile:
 
			self._updateTree()
 
		log.info("done")
0 comments (0 inline, 0 general)