diff options
author | Oli <olivier.belanger@umontreal.ca> | 2007-06-19 21:07:56 (GMT) |
---|---|---|
committer | Oli <olivier.belanger@umontreal.ca> | 2007-06-19 21:07:56 (GMT) |
commit | f369cb9982618ae3c54d29f6bb3b51714b4b70f7 (patch) | |
tree | 35656a059eab41bb33435b57c3ecc3b2e0b04743 /Util | |
parent | 5c9a928310120a7ae95c6dde45b9f9ddc18c0c57 (diff) | |
parent | 6dac818750901beb90b79584b41e2df9be673c7c (diff) |
Merge branch 'master' of git+ssh://olipet@dev.laptop.org/git/projects/tamtam
Diffstat (limited to 'Util')
-rw-r--r-- | Util/Network.py | 120 |
1 files changed, 58 insertions, 62 deletions
diff --git a/Util/Network.py b/Util/Network.py index f90ea6f..f884beb 100644 --- a/Util/Network.py +++ b/Util/Network.py @@ -40,9 +40,11 @@ MD_PEER = 2 message_enum = [ ("HT_LATENCY_REPLY", 4), # reply to latency test ("HT_SYNC_REPLY", 8), # reply to sync test +("HT_TEMPO_UPDATE", 4), # reply to sync test ("PR_LATENCY_QUERY", 4), # test latency ("PR_SYNC_QUERY", 4), # test sync +("PR_TEMPO_QUERY", 4), # test sync ("MAX_MSG_ID", 0) ] @@ -126,10 +128,9 @@ class Network: # prepare message handlers self.processMessage = {} for i in range(1,MAX_MSG_ID): - try: - exec "self.processMessage[" + str(i) + "] = self.process" + MSG_NAME[i] - except: - print "Network:: message handler not defined for " + MSG_NAME[i] + self.processMessage[i] = [] + + self.statusWatcher = [] # data packing classes self.packer = xdrlib.Packer() @@ -148,11 +149,13 @@ class Network: self.outputSockets = [] self.exceptSockets = [] self.connection = {} # dict of connections indexed by socket - # self.processTimeout = False self.latencyQueryHandler = {} self.latencyQueryStart = {} + self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY ) + self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY ) + self.setMode( mode, hostaddress ) def shutdown( self ): @@ -193,10 +196,6 @@ class Network: self.socket = None self.hostAddress = None -# if self.processTimeout: -# gobject.source_remove( self.processTimeout ) -# self.processTimeout = False - # initialize new mode self.mode = mode @@ -205,7 +204,7 @@ class Network: try: self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) address = ("",PORT) - self.connection[socket] = Connection( self.socket, address ) + self.connection[self.socket] = Connection( self.socket, address ) self.socket.bind(address) # self.socket.setblocking(0) self.socket.listen(BACKLOG) @@ -241,7 +240,6 @@ class Network: else: self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) self.listener.start() - self.queryLatency( lambda x: (x*1000) ) except socket.error, (value, message): if self.socket: self.socket.close() @@ -259,9 +257,6 @@ class Network: self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) self.listener = None - #if self.mode != MD_OFFLINE: - # self.processTimeout = gobject.timeout_add( 10, self.process ) - def addPeer( self, peer, address ): if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0] self.connection[peer] = Connection( peer, address ) @@ -274,6 +269,37 @@ class Network: self.inputSockets.remove(peer) self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) + # register a status watcher, format: func( self, status, args ) + def addWatcher( self, func ): + self.statusWatcher.append( func ) + + def removeWatcher( self, func ): + self.statusWatcher.remove( func ) + + # connect a message handler, format: func( self, sock, message, data ) + def connectMessage( self, message, func ): + self.processMessage[message].append(func) + + def connectMessageAfter( self, message, func, after ): + try: + ind = self.processMessage[message].index(after) + self.processMessage[message].insert(ind+1,func) + except: + print "Network::connectMessageAfter:: function not registered: " + str(after) + + def connectMessageBefore( self, message, func, before ): + try: + ind = self.processMessage[message].index(before) + self.processMessage[message].insert(ind,func) + except: + print "Network::connectMessageBefore:: function not registered: " + str(before) + + def disconnectMessage( self, message, func ): + try: + self.processMessage[message].remove(func) + except: + print "Network::disconnectMessage:: function not registered: " + str(func) + def isOffline( self ): if self.mode == MD_OFFLINE: return True return False @@ -322,14 +348,14 @@ class Network: if self.mode == MD_PEER: try: self.socket.send( msg ) - print "Network:: sent %d bytes: %s" % (len(msg),msg) + #print "Network:: sent %d bytes: %s" % (len(msg),msg) except socket.error, (value, errmsg): print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.hostAddress[0], errmsg) # TODO something intelligent else: # MD_HOST try: to.send( msg ) - print "Network:: sent %d bytes: %s" % (len(msg),msg) + #print "Network:: sent %d bytes: %s" % (len(msg),msg) except socket.error, (value, errmsg): print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) # TODO something intelligent @@ -357,17 +383,16 @@ class Network: msg = chr(message) + self.packer.get_buffer() + data self.packer.reset() - for con in self.connection: - if con.socket == self.socket: + for sock in self.connection: + if sock == self.socket: continue try: - con.socket.send( msg ) + sock.send( msg ) except socket.error, (value, errmsg): - print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) + print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[sock].address[0], errmsg) # TODO something intelligent - - - def queryLatency( self, handler ): + + def sendLatencyQuery( self, handler ): if self.mode != MD_PEER: return @@ -378,17 +403,6 @@ class Network: self.latencyQueryStart[hash] = time.time() self.send(PR_LATENCY_QUERY,hash) - def querySync( self, handler ): - if self.mode != MD_PEER: - return - - self.packer.pack_float(random.random()) - hash = self.packer.get_buffer() - self.packer.reset() - self.latencyQueryHandler[hash] = handler - self.latencyQueryStart[hash] = time.time() - self.send(PR_SYNC_QUERY,hash) - #----------------------------------------------------------------------- # Message Handlers @@ -408,7 +422,7 @@ class Network: else: try: data = s.recv(MAX_SIZE) - print "Network:: recv %d bytes: %s" % (len(data), data) + #print "Network:: recv %d bytes: %s" % (len(data), data) if not len(data): # no data to read, socket must be closed self.removePeer(s) else: @@ -424,7 +438,7 @@ class Network: if not len(data): # no data to read, socket must be closed self.setMode( MD_OFFLINE ) else: - print "Network:: recv %d bytes: %s" % (len(data), data) + #print "Network:: recv %d bytes: %s" % (len(data), data) self.processStream( s, data ) except socket.error, (value, message): print "Network:: error reading data: " + message @@ -451,7 +465,8 @@ class Network: data = con.recvBuf[0:con.waitingForData] con.recvBuf = con.recvBuf[con.waitingForData:] con.waitingForData = 0 - self.processMessage[con.message]( sock, data ) + for func in self.processMessage[con.message]: + func( sock, con.message, data ) else: return # wait for more data @@ -459,7 +474,8 @@ class Network: con.message = ord(con.recvBuf[0]) if MSG_SIZE[con.message] == 0: con.recvBuf = con.recvBuf[1:] - self.processMessage[con.message]( sock, "" ) + for func in self.processMessage[con.message]: + func( sock, con.message, "" ) else: con.waitingForData = MSG_SIZE[con.message] con.recvBuf = con.recvBuf[1:] @@ -468,36 +484,16 @@ class Network: self.processStream( sock ) #-- HOST handlers ------------------------------------------------------ - def processPR_LATENCY_QUERY( self, sock, data ): + def processPR_LATENCY_QUERY( self, sock, message, data ): self.send( HT_LATENCY_REPLY, data, sock ) - def processPR_SYNC_QUERY( self, sock, data ): - self.packer.pack_float(self.nextHeartbeat()) - self.send( HT_SYNC_REPLY, data + self.packer.get_buffer(), sock ) - self.packer.reset() - - def registerHeartbeat( self, handler ): - self.nextHeartbeat = handler - - #-- PEER handlers ------------------------------------------------------ - def processHT_LATENCY_REPLY( self, sock, data ): + #-- PEER handlers ------------------------------------------------------ + def processHT_LATENCY_REPLY( self, sock, message, data ): t = time.time() latency = t - self.latencyQueryStart[data] - print "got latency reply %d" % (latency*1000) + #print "got latency reply %d" % (latency*1000) self.latencyQueryHandler[data]( latency ) self.latencyQueryHandler.pop(data) self.latencyQueryStart.pop(data) - #self.queryLatency() - def processHT_SYNC_REPLY( self, sock, data ): - t = time.time() - hash = data[0:4] - latency = t - self.latencyQueryStart[hash] - print "got sync reply %d" % (latency*1000) - self.unpacker.reset(data[4:8]) - self.latencyQueryHandler[hash]( latency, self.unpacker.unpack_float() ) - self.latencyQueryHandler.pop(hash) - self.latencyQueryStart.pop(hash) - - |