Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/Util/Network.py
diff options
context:
space:
mode:
authoramartin <olpc@xo-02-2E-11.localdomain>2007-06-19 23:39:43 (GMT)
committer amartin <olpc@xo-02-2E-11.localdomain>2007-06-19 23:39:43 (GMT)
commit97b554893c1215a1e35a81b1a13eaf807a3d5220 (patch)
tree13f6a382ca02b5b6251f5d50d66746c269183890 /Util/Network.py
parent016b7dfa8db5a70aa5f071b0927f896fad199566 (diff)
network
Diffstat (limited to 'Util/Network.py')
-rw-r--r--Util/Network.py90
1 files changed, 43 insertions, 47 deletions
diff --git a/Util/Network.py b/Util/Network.py
index c634507..ce72f08 100644
--- a/Util/Network.py
+++ b/Util/Network.py
@@ -126,10 +126,9 @@ class Network:
# prepare message handlers
self.processMessage = {}
for i in range(1,MAX_MSG_ID):
- try:
- exec "self.processMessage[" + str(i) + "] = self.process" + MSG_NAME[i]
- except:
- print "Network:: message handler not defined for " + MSG_NAME[i]
+ self.processMessage[i] = []
+
+ self.statusWatcher = []
# data packing classes
self.packer = xdrlib.Packer()
@@ -148,11 +147,13 @@ class Network:
self.outputSockets = []
self.exceptSockets = []
self.connection = {} # dict of connections indexed by socket
- # self.processTimeout = False
self.latencyQueryHandler = {}
self.latencyQueryStart = {}
+ self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY )
+ self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY )
+
self.setMode( mode, hostaddress )
def shutdown( self ):
@@ -193,10 +194,6 @@ class Network:
self.socket = None
self.hostAddress = None
-# if self.processTimeout:
-# gobject.source_remove( self.processTimeout )
-# self.processTimeout = False
-
# initialize new mode
self.mode = mode
@@ -259,9 +256,6 @@ class Network:
self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
self.listener = None
- #if self.mode != MD_OFFLINE:
- # self.processTimeout = gobject.timeout_add( 10, self.process )
-
def addPeer( self, peer, address ):
if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0]
self.connection[peer] = Connection( peer, address )
@@ -274,6 +268,37 @@ class Network:
self.inputSockets.remove(peer)
self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ # register a status watcher, format: func( self, status, args )
+ def addWatcher( self, func ):
+ self.statusWatcher.append( func )
+
+ def removeWatcher( self, func ):
+ self.statusWatcher.remove( func )
+
+ # connect a message handler, format: func( self, sock, message, data )
+ def connectMessage( self, message, func ):
+ self.processMessage[message].append(func)
+
+ def connectMessageAfter( self, message, func, after ):
+ try:
+ ind = self.processMessage[message].index(after)
+ self.processMessage[message].insert(ind+1,func)
+ except:
+ print "Network::connectMessageAfter:: function not registered: " + str(after)
+
+ def connectMessageBefore( self, message, func, before ):
+ try:
+ ind = self.processMessage[message].index(before)
+ self.processMessage[message].insert(ind,func)
+ except:
+ print "Network::connectMessageBefore:: function not registered: " + str(before)
+
+ def disconnectMessage( self, message, func ):
+ try:
+ self.processMessage[message].remove(func)
+ except:
+ print "Network::disconnectMessage:: function not registered: " + str(func)
+
def isOffline( self ):
if self.mode == MD_OFFLINE: return True
return False
@@ -365,8 +390,7 @@ class Network:
except socket.error, (value, errmsg):
print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg)
# TODO something intelligent
-
-
+
def queryLatency( self, handler ):
if self.mode != MD_PEER:
return
@@ -378,17 +402,6 @@ class Network:
self.latencyQueryStart[hash] = time.time()
self.send(PR_LATENCY_QUERY,hash)
- def querySync( self, handler ):
- if self.mode != MD_PEER:
- return
-
- self.packer.pack_float(random.random())
- hash = self.packer.get_buffer()
- self.packer.reset()
- self.latencyQueryHandler[hash] = handler
- self.latencyQueryStart[hash] = time.time()
- self.send(PR_SYNC_QUERY,hash)
-
#-----------------------------------------------------------------------
# Message Handlers
@@ -451,7 +464,8 @@ class Network:
data = con.recvBuf[0:con.waitingForData]
con.recvBuf = con.recvBuf[con.waitingForData:]
con.waitingForData = 0
- self.processMessage[con.message]( sock, data )
+ for func in self.processMessage[con.message]:
+ func( sock, con.message, data )
else:
return # wait for more data
@@ -459,7 +473,8 @@ class Network:
con.message = ord(con.recvBuf[0])
if MSG_SIZE[con.message] == 0:
con.recvBuf = con.recvBuf[1:]
- self.processMessage[con.message]( sock, "" )
+ for func in self.processMessage[con.message]:
+ func( sock, con.message, "" )
else:
con.waitingForData = MSG_SIZE[con.message]
con.recvBuf = con.recvBuf[1:]
@@ -471,15 +486,7 @@ class Network:
def processPR_LATENCY_QUERY( self, sock, data ):
self.send( HT_LATENCY_REPLY, data, sock )
- def processPR_SYNC_QUERY( self, sock, data ):
- self.packer.pack_float(self.nextHeartbeat())
- self.send( HT_SYNC_REPLY, data + self.packer.get_buffer(), sock )
- self.packer.reset()
-
- def registerHeartbeat( self, handler ):
- self.nextHeartbeat = handler
-
- #-- PEER handlers ------------------------------------------------------
+ #-- PEER handlers ------------------------------------------------------
def processHT_LATENCY_REPLY( self, sock, data ):
t = time.time()
latency = t - self.latencyQueryStart[data]
@@ -489,15 +496,4 @@ class Network:
self.latencyQueryStart.pop(data)
#self.queryLatency()
- def processHT_SYNC_REPLY( self, sock, data ):
- t = time.time()
- hash = data[0:4]
- latency = t - self.latencyQueryStart[hash]
- #print "got sync reply %d" % (latency*1000)
- self.unpacker.reset(data[4:8])
- self.latencyQueryHandler[hash]( latency, self.unpacker.unpack_float() )
- self.latencyQueryHandler.pop(hash)
- self.latencyQueryStart.pop(hash)
-
-