From 97b554893c1215a1e35a81b1a13eaf807a3d5220 Mon Sep 17 00:00:00 2001 From: amartin Date: Tue, 19 Jun 2007 23:39:43 +0000 Subject: network --- (limited to 'Util/Network.py') diff --git a/Util/Network.py b/Util/Network.py index c634507..ce72f08 100644 --- a/Util/Network.py +++ b/Util/Network.py @@ -126,10 +126,9 @@ class Network: # prepare message handlers self.processMessage = {} for i in range(1,MAX_MSG_ID): - try: - exec "self.processMessage[" + str(i) + "] = self.process" + MSG_NAME[i] - except: - print "Network:: message handler not defined for " + MSG_NAME[i] + self.processMessage[i] = [] + + self.statusWatcher = [] # data packing classes self.packer = xdrlib.Packer() @@ -148,11 +147,13 @@ class Network: self.outputSockets = [] self.exceptSockets = [] self.connection = {} # dict of connections indexed by socket - # self.processTimeout = False self.latencyQueryHandler = {} self.latencyQueryStart = {} + self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY ) + self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY ) + self.setMode( mode, hostaddress ) def shutdown( self ): @@ -193,10 +194,6 @@ class Network: self.socket = None self.hostAddress = None -# if self.processTimeout: -# gobject.source_remove( self.processTimeout ) -# self.processTimeout = False - # initialize new mode self.mode = mode @@ -259,9 +256,6 @@ class Network: self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) self.listener = None - #if self.mode != MD_OFFLINE: - # self.processTimeout = gobject.timeout_add( 10, self.process ) - def addPeer( self, peer, address ): if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0] self.connection[peer] = Connection( peer, address ) @@ -274,6 +268,37 @@ class Network: self.inputSockets.remove(peer) self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) + # register a status watcher, format: func( self, status, args ) + def addWatcher( self, func ): + self.statusWatcher.append( func ) + + def removeWatcher( self, func ): + self.statusWatcher.remove( func ) + + # connect a message handler, format: func( self, sock, message, data ) + def connectMessage( self, message, func ): + self.processMessage[message].append(func) + + def connectMessageAfter( self, message, func, after ): + try: + ind = self.processMessage[message].index(after) + self.processMessage[message].insert(ind+1,func) + except: + print "Network::connectMessageAfter:: function not registered: " + str(after) + + def connectMessageBefore( self, message, func, before ): + try: + ind = self.processMessage[message].index(before) + self.processMessage[message].insert(ind,func) + except: + print "Network::connectMessageBefore:: function not registered: " + str(before) + + def disconnectMessage( self, message, func ): + try: + self.processMessage[message].remove(func) + except: + print "Network::disconnectMessage:: function not registered: " + str(func) + def isOffline( self ): if self.mode == MD_OFFLINE: return True return False @@ -365,8 +390,7 @@ class Network: except socket.error, (value, errmsg): print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) # TODO something intelligent - - + def queryLatency( self, handler ): if self.mode != MD_PEER: return @@ -378,17 +402,6 @@ class Network: self.latencyQueryStart[hash] = time.time() self.send(PR_LATENCY_QUERY,hash) - def querySync( self, handler ): - if self.mode != MD_PEER: - return - - self.packer.pack_float(random.random()) - hash = self.packer.get_buffer() - self.packer.reset() - self.latencyQueryHandler[hash] = handler - self.latencyQueryStart[hash] = time.time() - self.send(PR_SYNC_QUERY,hash) - #----------------------------------------------------------------------- # Message Handlers @@ -451,7 +464,8 @@ class Network: data = con.recvBuf[0:con.waitingForData] con.recvBuf = con.recvBuf[con.waitingForData:] con.waitingForData = 0 - self.processMessage[con.message]( sock, data ) + for func in self.processMessage[con.message]: + func( sock, con.message, data ) else: return # wait for more data @@ -459,7 +473,8 @@ class Network: con.message = ord(con.recvBuf[0]) if MSG_SIZE[con.message] == 0: con.recvBuf = con.recvBuf[1:] - self.processMessage[con.message]( sock, "" ) + for func in self.processMessage[con.message]: + func( sock, con.message, "" ) else: con.waitingForData = MSG_SIZE[con.message] con.recvBuf = con.recvBuf[1:] @@ -471,15 +486,7 @@ class Network: def processPR_LATENCY_QUERY( self, sock, data ): self.send( HT_LATENCY_REPLY, data, sock ) - def processPR_SYNC_QUERY( self, sock, data ): - self.packer.pack_float(self.nextHeartbeat()) - self.send( HT_SYNC_REPLY, data + self.packer.get_buffer(), sock ) - self.packer.reset() - - def registerHeartbeat( self, handler ): - self.nextHeartbeat = handler - - #-- PEER handlers ------------------------------------------------------ + #-- PEER handlers ------------------------------------------------------ def processHT_LATENCY_REPLY( self, sock, data ): t = time.time() latency = t - self.latencyQueryStart[data] @@ -489,15 +496,4 @@ class Network: self.latencyQueryStart.pop(data) #self.queryLatency() - def processHT_SYNC_REPLY( self, sock, data ): - t = time.time() - hash = data[0:4] - latency = t - self.latencyQueryStart[hash] - #print "got sync reply %d" % (latency*1000) - self.unpacker.reset(data[4:8]) - self.latencyQueryHandler[hash]( latency, self.unpacker.unpack_float() ) - self.latencyQueryHandler.pop(hash) - self.latencyQueryStart.pop(hash) - - -- cgit v0.9.1