diff --git a/src/client.py b/src/client.py --- a/src/client.py +++ b/src/client.py @@ -11,6 +11,9 @@ from hashtree import HashTree,hashBlock from netnode import BaseConnection,NetNode,FailedConnection +class DeniedConnection(Exception): pass + + class Connection(BaseConnection): def __init__(self,host,port): super().__init__() @@ -42,6 +45,14 @@ class Client(NetNode): print(datetime.now(), "initializing...") super().__init__(filename,treeFile) + def init(self,action): + jsonData={"command":"init", "blockSize":self._tree.BLOCK_SIZE, "blockCount":self._tree.leafCount, "version":conf.version, "action":action} + self._outcoming.writeMsg(jsonData) + jsonData,binData=self._incoming.readMsg() + if jsonData["command"]=="deny": + raise DeniedConnection() + assert jsonData["command"]=="init" + ## Asks server for node hashes to determine which are to be transferred. # # Uses a binary HashTree, where item at k is hash of items at 2k+1, 2k+2. @@ -52,12 +63,6 @@ class Client(NetNode): blocksToTransfer=[] nodeStack=collections.deque([0]) # root - # initialize session - jsonData={"command":"init", "blockSize":localTree.BLOCK_SIZE, "blockCount":localTree.leafCount, "version":conf.version} - self._outcoming.writeMsg(jsonData) - jsonData,binData=self._incoming.readMsg() - assert jsonData["command"]=="ack" - # determine which blocks to send print(datetime.now(), "negotiating:") progress=Progress(localTree.leafCount) @@ -115,12 +120,13 @@ class Client(NetNode): progress.p(k) progress.done() - self._outcoming.writeMsg({"command":"end"}) + self._outcoming.writeMsg({"command":"end","action":"push"}) log.info("closing session...") dataFile.close() def pullData(self,blocksToTransfer): + self._lock() log.info(blocksToTransfer) dataFile=open(self._filename, mode="rb+") i1=-1 @@ -150,6 +156,7 @@ class Client(NetNode): log.info("closing session...") dataFile.close() + self._unlock() if self._treeFile: self._updateTree() diff --git a/src/config.py b/src/config.py --- a/src/config.py +++ b/src/config.py @@ -21,7 +21,7 @@ conf=dict() if os.path.isfile(configFile): with open(configFile) as f: conf=json.load(f) -version=0 +version=(0,0,0) hosts=conf.get("hosts",["127.0.0.1"]) port=conf.get("port",9901) diff --git a/src/morevna.py b/src/morevna.py --- a/src/morevna.py +++ b/src/morevna.py @@ -7,7 +7,7 @@ from util import spawnDaemon, splitHost import config as conf import stats from hashtree import HashTree -from client import Client, Connection as ClientConnection, FailedConnection +from client import Client, Connection as ClientConnection, FailedConnection, DeniedConnection from server import Miniserver @@ -43,12 +43,15 @@ def push(args): try: with ClientConnection(*host) as con: c.setConnection(con) + c.init("push") blocksToTransfer=c.negotiate() c.sendData(blocksToTransfer) print() print(stats.report()) print() - except FailedConnection: continue + except FailedConnection: pass + except DeniedConnection: + print("Server {0}:{1} denied connection.".format(*host)) def pull(args): _checkFile(args.datafile) @@ -62,11 +65,14 @@ def pull(args): try: with ClientConnection(*host) as con: c.setConnection(con) + c.init("pull") blocksToTransfer=c.negotiate() c.pullData(blocksToTransfer) print() print(stats.report()) except FailedConnection: pass + except DeniedConnection: + print("Server {0}:{1} denied connection.".format(*host)) def serve(args): _checkFile(args.datafile) diff --git a/src/netnode.py b/src/netnode.py --- a/src/netnode.py +++ b/src/netnode.py @@ -1,11 +1,17 @@ +import os import socket import logging as log +import config as conf from networkers import NetworkReader,NetworkWriter from hashtree import HashTree +lockFile=os.path.join(conf.directory,"dirty.lock") + + class FailedConnection(Exception): pass +class LockedException(Exception): pass class BaseConnection: # abstract @@ -47,6 +53,19 @@ class NetNode: self._newLeaves=dict() + def isLocked(self): + return os.path.isfile(lockFile) + + def _lock(self): + try: + f=open(lockFile,"x") + f.close() + except FileExistsError: + raise LockedException() + + def _unlock(self): + os.remove(lockFile) + def _updateTree(self): log.info("updating hash tree...") self._tree.batchUpdate(self._newLeaves.items()) diff --git a/src/server.py b/src/server.py --- a/src/server.py +++ b/src/server.py @@ -71,9 +71,12 @@ class Server(NetNode): jsonData,binData=self._incoming.readMsg() if jsonData["command"]=="init": - assert jsonData["blockSize"]==self.BLOCK_SIZE - assert jsonData["blockCount"]==self._tree.leafCount - self._outcoming.writeMsg({"command": "ack"}) + if jsonData["blockSize"]!=self.BLOCK_SIZE or jsonData["blockCount"]!=self._tree.leafCount: + self._outcoming.writeMsg({"command":"deny"}) + if jsonData["action"]=="pull" and self.isLocked(): + self._outcoming.writeMsg({"command":"deny"}) + + self._outcoming.writeMsg({"command":"init", "version":conf.version}) elif jsonData["command"]=="req": if jsonData["dataType"]=="data": @@ -86,7 +89,7 @@ class Server(NetNode): elif jsonData["command"]=="end": self._finalize() - self._locked=False + if jsonData.get("action")=="push": self._unlock() return False else: @@ -115,6 +118,7 @@ class Server(NetNode): return (jsonResponse,binResponse) def _receiveData(self,jsonData,binData): + if not self.isLocked(): self._lock() log.info("received data block #{0}: {1}...{2}".format(jsonData["index"],binData[:5],binData[-5:])) i=jsonData["index"]