Changeset - 6f31f20feb06
[Not reviewed]
default
0 3 1
Laman - 8 years ago 2017-02-02 21:53:26

client organized into functions
4 files changed with 79 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/client.py
Show inline comments
 
from hashtree import HashTree
 
import collections
 
from networkers import NetworkReader,NetworkWriter
 
import socket
 
import sys
 
 
 
localTree=HashTree.fromFile(open("clientFile.txt",mode="rb"))
 
 
HOST = '127.0.0.1'    # The remote host
 
PORT = 50009              # The same port as used by the server
 
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
s.connect((HOST, PORT))
 
fr=s.makefile(mode='rb')
 
fw=s.makefile(mode='wb')
 
import config as conf
 
from networkers import NetworkReader,NetworkWriter
 
 
networkReader=NetworkReader(fr)
 
networkReader.start()
 
networkWriter=NetworkWriter(fw)
 
networkWriter.start()
 
 
blocksToTransfer=[]
 
nodeStack=collections.deque([0]) # root
 
incoming=networkReader.output # synchronized message queue
 
outcoming=networkWriter.input
 
 
# initialize session
 
jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount}
 
outcoming.put((jsonData,b""))
 
def connect():
 
	HOST = conf.hosts[0]    # The remote host
 
	PORT = conf.port              # The same port as used by the server
 
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
	s.connect((HOST, PORT))
 
	fr=s.makefile(mode='rb')
 
	fw=s.makefile(mode='wb')
 
 
# determine which blocks to send
 
while len(nodeStack)>0:
 
  i=nodeStack.pop()
 
  jsonData={"command":"req", "index":i}
 
  outcoming.put((jsonData,b""))
 
  
 
  jsonData,binData=incoming.get(timeout=2)
 
  assert jsonData["index"]==i
 
  assert jsonData["dataType"]=="hash"
 
  
 
  if localTree.store[i]!=binData:
 
    if 2*i+3<len(localTree.store): # inner node
 
      nodeStack.append(2*i+2)
 
      nodeStack.append(2*i+1)
 
    else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
	networkReader=NetworkReader(fr)
 
	networkReader.start()
 
	networkWriter=NetworkWriter(fw)
 
	networkWriter.start()
 
 
	incoming=networkReader.output # synchronized message queue
 
	outcoming=networkWriter.input
 
 
	return (s,incoming,outcoming)
 
 
 
# send the actual data
 
print(blocksToTransfer)
 
dataFile=open("clientFile.txt",mode="rb")
 
def negotiate(incoming,outcoming):
 
	localTree=HashTree.fromFile(open("clientFile.txt",mode="rb"))
 
	blocksToTransfer=[]
 
	nodeStack=collections.deque([0]) # root
 
 
	# initialize session
 
	jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version}
 
	outcoming.put((jsonData,b""))
 
 
	# determine which blocks to send
 
	while len(nodeStack)>0:
 
		i=nodeStack.pop()
 
		jsonData={"command":"req", "index":i}
 
		outcoming.put((jsonData,b""))
 
 
		jsonData,binData=incoming.get(timeout=2)
 
		assert jsonData["index"]==i
 
		assert jsonData["dataType"]=="hash"
 
 
		if localTree.store[i]!=binData:
 
			if 2*i+3<len(localTree.store): # inner node
 
				nodeStack.append(2*i+2)
 
				nodeStack.append(2*i+1)
 
			else: blocksToTransfer.append(i-localTree.leafStart) # leaf
 
 
	return blocksToTransfer
 
 
 
for i in blocksToTransfer:
 
  jsonData={"command":"send", "index":i, "dataType":"data"}
 
  dataFile.seek(i*localTree.BLOCK_SIZE)
 
  binData=dataFile.read(localTree.BLOCK_SIZE)
 
  
 
  print("block #{0}: {1}...{2}".format(i,binData[:5],binData[-5:]))
 
  
 
  outcoming.put((jsonData,binData),timeout=2)
 
def sendData(outcoming,blocksToTransfer):
 
	print(blocksToTransfer)
 
	dataFile=open("clientFile.txt",mode="rb")
 
 
	for i in blocksToTransfer:
 
		jsonData={"command":"send", "index":i, "dataType":"data"}
 
		dataFile.seek(i*HashTree.BLOCK_SIZE)
 
		binData=dataFile.read(HashTree.BLOCK_SIZE)
 
 
		print("block #{0}: {1}...{2}".format(i,binData[:5],binData[-5:]))
 
 
		outcoming.put((jsonData,binData),timeout=2)
 
 
jsonData={"command":"end"}
 
outcoming.put((jsonData,b""),timeout=2)
 
	jsonData={"command":"end"}
 
	outcoming.put((jsonData,b""),timeout=2)
 
 
	outcoming.put(None)
 
	print("closing...")
 
	dataFile.close()
 
 
 
outcoming.put(None)
 
print("closing...")
 
dataFile.close()
 
# fr.close()
 
# fw.close()
 
s.close()
 
sys.exit(0)
 
if __name__=="__main__":
 
	sock,incoming,outcoming = connect()
 
	blocksToTransfer=negotiate(incoming,outcoming)
 
	sendData(outcoming,blocksToTransfer)
 
 
	sock.close()
 
	sys.exit(0)
src/config.py
Show inline comments
 
new file 100644
 
version=0
 

	
 
hosts=["10.0.0.33"]
 
port=50009
src/db.py
Show inline comments
 
@@ -3,6 +3,7 @@ import time
 

	
 
COMMIT_TIME=2 # s
 

	
 

	
 
class DB:
 
	def __init__(self,filename):
 
		self.filename=filename
src/work.py
Show inline comments
 
@@ -15,7 +15,7 @@ DB_NAME="morevna.db"
 
FILE_COUNT=2**21
 
TARGET_DIR="temp"
 

	
 
log.basicConfig(level=log.DEBUG)
 
log.basicConfig(format="{asctime} {levelname}: {message}", style="{", level=log.DEBUG)
 

	
 
class Pipeline:
 
	def __init__(self,fileName):
 
@@ -29,8 +29,9 @@ class Pipeline:
 
	def update(self):
 
		log.info("starting update")
 
		for (i,(chunk,storedHashRec)) in enumerate(zip_longest(self.chunks(), self.db.listHashes())):
 
			assert storedHashRec is None or storedHashRec["sector_id"]==i, "{0}!={1} {2}".format(i, storedHashRec["sector_id"], storedHashRec)
 
			if not chunk: break
 
			hash=self.hash(chunk)
 
			hash=Pipeline.hash(chunk)
 
			storedHash=(storedHashRec and storedHashRec["sector_hash"]) or b"\0"
 
			if hash==storedHash: continue
 
			log.info("updating sector %d, hash %s != stored %s", i, hash.hex(), storedHash.hex())
0 comments (0 inline, 0 general)