Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/Util
diff options
context:
space:
mode:
authorOli <olivier.belanger@umontreal.ca>2007-06-19 21:07:56 (GMT)
committer Oli <olivier.belanger@umontreal.ca>2007-06-19 21:07:56 (GMT)
commitf369cb9982618ae3c54d29f6bb3b51714b4b70f7 (patch)
tree35656a059eab41bb33435b57c3ecc3b2e0b04743 /Util
parent5c9a928310120a7ae95c6dde45b9f9ddc18c0c57 (diff)
parent6dac818750901beb90b79584b41e2df9be673c7c (diff)
Merge branch 'master' of git+ssh://olipet@dev.laptop.org/git/projects/tamtam
Diffstat (limited to 'Util')
-rw-r--r--Util/Network.py120
1 files changed, 58 insertions, 62 deletions
diff --git a/Util/Network.py b/Util/Network.py
index f90ea6f..f884beb 100644
--- a/Util/Network.py
+++ b/Util/Network.py
@@ -40,9 +40,11 @@ MD_PEER = 2
message_enum = [
("HT_LATENCY_REPLY", 4), # reply to latency test
("HT_SYNC_REPLY", 8), # reply to sync test
+("HT_TEMPO_UPDATE", 4), # reply to sync test
("PR_LATENCY_QUERY", 4), # test latency
("PR_SYNC_QUERY", 4), # test sync
+("PR_TEMPO_QUERY", 4), # test sync
("MAX_MSG_ID", 0)
]
@@ -126,10 +128,9 @@ class Network:
# prepare message handlers
self.processMessage = {}
for i in range(1,MAX_MSG_ID):
- try:
- exec "self.processMessage[" + str(i) + "] = self.process" + MSG_NAME[i]
- except:
- print "Network:: message handler not defined for " + MSG_NAME[i]
+ self.processMessage[i] = []
+
+ self.statusWatcher = []
# data packing classes
self.packer = xdrlib.Packer()
@@ -148,11 +149,13 @@ class Network:
self.outputSockets = []
self.exceptSockets = []
self.connection = {} # dict of connections indexed by socket
- # self.processTimeout = False
self.latencyQueryHandler = {}
self.latencyQueryStart = {}
+ self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY )
+ self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY )
+
self.setMode( mode, hostaddress )
def shutdown( self ):
@@ -193,10 +196,6 @@ class Network:
self.socket = None
self.hostAddress = None
-# if self.processTimeout:
-# gobject.source_remove( self.processTimeout )
-# self.processTimeout = False
-
# initialize new mode
self.mode = mode
@@ -205,7 +204,7 @@ class Network:
try:
self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
address = ("",PORT)
- self.connection[socket] = Connection( self.socket, address )
+ self.connection[self.socket] = Connection( self.socket, address )
self.socket.bind(address)
# self.socket.setblocking(0)
self.socket.listen(BACKLOG)
@@ -241,7 +240,6 @@ class Network:
else:
self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
self.listener.start()
- self.queryLatency( lambda x: (x*1000) )
except socket.error, (value, message):
if self.socket:
self.socket.close()
@@ -259,9 +257,6 @@ class Network:
self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
self.listener = None
- #if self.mode != MD_OFFLINE:
- # self.processTimeout = gobject.timeout_add( 10, self.process )
-
def addPeer( self, peer, address ):
if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0]
self.connection[peer] = Connection( peer, address )
@@ -274,6 +269,37 @@ class Network:
self.inputSockets.remove(peer)
self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ # register a status watcher, format: func( self, status, args )
+ def addWatcher( self, func ):
+ self.statusWatcher.append( func )
+
+ def removeWatcher( self, func ):
+ self.statusWatcher.remove( func )
+
+ # connect a message handler, format: func( self, sock, message, data )
+ def connectMessage( self, message, func ):
+ self.processMessage[message].append(func)
+
+ def connectMessageAfter( self, message, func, after ):
+ try:
+ ind = self.processMessage[message].index(after)
+ self.processMessage[message].insert(ind+1,func)
+ except:
+ print "Network::connectMessageAfter:: function not registered: " + str(after)
+
+ def connectMessageBefore( self, message, func, before ):
+ try:
+ ind = self.processMessage[message].index(before)
+ self.processMessage[message].insert(ind,func)
+ except:
+ print "Network::connectMessageBefore:: function not registered: " + str(before)
+
+ def disconnectMessage( self, message, func ):
+ try:
+ self.processMessage[message].remove(func)
+ except:
+ print "Network::disconnectMessage:: function not registered: " + str(func)
+
def isOffline( self ):
if self.mode == MD_OFFLINE: return True
return False
@@ -322,14 +348,14 @@ class Network:
if self.mode == MD_PEER:
try:
self.socket.send( msg )
- print "Network:: sent %d bytes: %s" % (len(msg),msg)
+ #print "Network:: sent %d bytes: %s" % (len(msg),msg)
except socket.error, (value, errmsg):
print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.hostAddress[0], errmsg)
# TODO something intelligent
else: # MD_HOST
try:
to.send( msg )
- print "Network:: sent %d bytes: %s" % (len(msg),msg)
+ #print "Network:: sent %d bytes: %s" % (len(msg),msg)
except socket.error, (value, errmsg):
print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg)
# TODO something intelligent
@@ -357,17 +383,16 @@ class Network:
msg = chr(message) + self.packer.get_buffer() + data
self.packer.reset()
- for con in self.connection:
- if con.socket == self.socket:
+ for sock in self.connection:
+ if sock == self.socket:
continue
try:
- con.socket.send( msg )
+ sock.send( msg )
except socket.error, (value, errmsg):
- print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg)
+ print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[sock].address[0], errmsg)
# TODO something intelligent
-
-
- def queryLatency( self, handler ):
+
+ def sendLatencyQuery( self, handler ):
if self.mode != MD_PEER:
return
@@ -378,17 +403,6 @@ class Network:
self.latencyQueryStart[hash] = time.time()
self.send(PR_LATENCY_QUERY,hash)
- def querySync( self, handler ):
- if self.mode != MD_PEER:
- return
-
- self.packer.pack_float(random.random())
- hash = self.packer.get_buffer()
- self.packer.reset()
- self.latencyQueryHandler[hash] = handler
- self.latencyQueryStart[hash] = time.time()
- self.send(PR_SYNC_QUERY,hash)
-
#-----------------------------------------------------------------------
# Message Handlers
@@ -408,7 +422,7 @@ class Network:
else:
try:
data = s.recv(MAX_SIZE)
- print "Network:: recv %d bytes: %s" % (len(data), data)
+ #print "Network:: recv %d bytes: %s" % (len(data), data)
if not len(data): # no data to read, socket must be closed
self.removePeer(s)
else:
@@ -424,7 +438,7 @@ class Network:
if not len(data): # no data to read, socket must be closed
self.setMode( MD_OFFLINE )
else:
- print "Network:: recv %d bytes: %s" % (len(data), data)
+ #print "Network:: recv %d bytes: %s" % (len(data), data)
self.processStream( s, data )
except socket.error, (value, message):
print "Network:: error reading data: " + message
@@ -451,7 +465,8 @@ class Network:
data = con.recvBuf[0:con.waitingForData]
con.recvBuf = con.recvBuf[con.waitingForData:]
con.waitingForData = 0
- self.processMessage[con.message]( sock, data )
+ for func in self.processMessage[con.message]:
+ func( sock, con.message, data )
else:
return # wait for more data
@@ -459,7 +474,8 @@ class Network:
con.message = ord(con.recvBuf[0])
if MSG_SIZE[con.message] == 0:
con.recvBuf = con.recvBuf[1:]
- self.processMessage[con.message]( sock, "" )
+ for func in self.processMessage[con.message]:
+ func( sock, con.message, "" )
else:
con.waitingForData = MSG_SIZE[con.message]
con.recvBuf = con.recvBuf[1:]
@@ -468,36 +484,16 @@ class Network:
self.processStream( sock )
#-- HOST handlers ------------------------------------------------------
- def processPR_LATENCY_QUERY( self, sock, data ):
+ def processPR_LATENCY_QUERY( self, sock, message, data ):
self.send( HT_LATENCY_REPLY, data, sock )
- def processPR_SYNC_QUERY( self, sock, data ):
- self.packer.pack_float(self.nextHeartbeat())
- self.send( HT_SYNC_REPLY, data + self.packer.get_buffer(), sock )
- self.packer.reset()
-
- def registerHeartbeat( self, handler ):
- self.nextHeartbeat = handler
-
- #-- PEER handlers ------------------------------------------------------
- def processHT_LATENCY_REPLY( self, sock, data ):
+ #-- PEER handlers ------------------------------------------------------
+ def processHT_LATENCY_REPLY( self, sock, message, data ):
t = time.time()
latency = t - self.latencyQueryStart[data]
- print "got latency reply %d" % (latency*1000)
+ #print "got latency reply %d" % (latency*1000)
self.latencyQueryHandler[data]( latency )
self.latencyQueryHandler.pop(data)
self.latencyQueryStart.pop(data)
- #self.queryLatency()
- def processHT_SYNC_REPLY( self, sock, data ):
- t = time.time()
- hash = data[0:4]
- latency = t - self.latencyQueryStart[hash]
- print "got sync reply %d" % (latency*1000)
- self.unpacker.reset(data[4:8])
- self.latencyQueryHandler[hash]( latency, self.unpacker.unpack_float() )
- self.latencyQueryHandler.pop(hash)
- self.latencyQueryStart.pop(hash)
-
-