diff options
author | amartin <olpc@xo-02-2E-11.localdomain> | 2007-06-19 23:39:43 (GMT) |
---|---|---|
committer | amartin <olpc@xo-02-2E-11.localdomain> | 2007-06-19 23:39:43 (GMT) |
commit | 97b554893c1215a1e35a81b1a13eaf807a3d5220 (patch) | |
tree | 13f6a382ca02b5b6251f5d50d66746c269183890 | |
parent | 016b7dfa8db5a70aa5f071b0927f896fad199566 (diff) |
network
-rw-r--r-- | Util/Network.py | 90 | ||||
-rw-r--r-- | miniTamTam/miniTamTamMain.py | 62 |
2 files changed, 96 insertions, 56 deletions
diff --git a/Util/Network.py b/Util/Network.py index c634507..ce72f08 100644 --- a/Util/Network.py +++ b/Util/Network.py @@ -126,10 +126,9 @@ class Network: # prepare message handlers self.processMessage = {} for i in range(1,MAX_MSG_ID): - try: - exec "self.processMessage[" + str(i) + "] = self.process" + MSG_NAME[i] - except: - print "Network:: message handler not defined for " + MSG_NAME[i] + self.processMessage[i] = [] + + self.statusWatcher = [] # data packing classes self.packer = xdrlib.Packer() @@ -148,11 +147,13 @@ class Network: self.outputSockets = [] self.exceptSockets = [] self.connection = {} # dict of connections indexed by socket - # self.processTimeout = False self.latencyQueryHandler = {} self.latencyQueryStart = {} + self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY ) + self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY ) + self.setMode( mode, hostaddress ) def shutdown( self ): @@ -193,10 +194,6 @@ class Network: self.socket = None self.hostAddress = None -# if self.processTimeout: -# gobject.source_remove( self.processTimeout ) -# self.processTimeout = False - # initialize new mode self.mode = mode @@ -259,9 +256,6 @@ class Network: self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) self.listener = None - #if self.mode != MD_OFFLINE: - # self.processTimeout = gobject.timeout_add( 10, self.process ) - def addPeer( self, peer, address ): if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0] self.connection[peer] = Connection( peer, address ) @@ -274,6 +268,37 @@ class Network: self.inputSockets.remove(peer) self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) + # register a status watcher, format: func( self, status, args ) + def addWatcher( self, func ): + self.statusWatcher.append( func ) + + def removeWatcher( self, func ): + self.statusWatcher.remove( func ) + + # connect a message handler, format: func( self, sock, message, data ) + def connectMessage( self, message, func ): + self.processMessage[message].append(func) + + def connectMessageAfter( self, message, func, after ): + try: + ind = self.processMessage[message].index(after) + self.processMessage[message].insert(ind+1,func) + except: + print "Network::connectMessageAfter:: function not registered: " + str(after) + + def connectMessageBefore( self, message, func, before ): + try: + ind = self.processMessage[message].index(before) + self.processMessage[message].insert(ind,func) + except: + print "Network::connectMessageBefore:: function not registered: " + str(before) + + def disconnectMessage( self, message, func ): + try: + self.processMessage[message].remove(func) + except: + print "Network::disconnectMessage:: function not registered: " + str(func) + def isOffline( self ): if self.mode == MD_OFFLINE: return True return False @@ -365,8 +390,7 @@ class Network: except socket.error, (value, errmsg): print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) # TODO something intelligent - - + def queryLatency( self, handler ): if self.mode != MD_PEER: return @@ -378,17 +402,6 @@ class Network: self.latencyQueryStart[hash] = time.time() self.send(PR_LATENCY_QUERY,hash) - def querySync( self, handler ): - if self.mode != MD_PEER: - return - - self.packer.pack_float(random.random()) - hash = self.packer.get_buffer() - self.packer.reset() - self.latencyQueryHandler[hash] = handler - self.latencyQueryStart[hash] = time.time() - self.send(PR_SYNC_QUERY,hash) - #----------------------------------------------------------------------- # Message Handlers @@ -451,7 +464,8 @@ class Network: data = con.recvBuf[0:con.waitingForData] con.recvBuf = con.recvBuf[con.waitingForData:] con.waitingForData = 0 - self.processMessage[con.message]( sock, data ) + for func in self.processMessage[con.message]: + func( sock, con.message, data ) else: return # wait for more data @@ -459,7 +473,8 @@ class Network: con.message = ord(con.recvBuf[0]) if MSG_SIZE[con.message] == 0: con.recvBuf = con.recvBuf[1:] - self.processMessage[con.message]( sock, "" ) + for func in self.processMessage[con.message]: + func( sock, con.message, "" ) else: con.waitingForData = MSG_SIZE[con.message] con.recvBuf = con.recvBuf[1:] @@ -471,15 +486,7 @@ class Network: def processPR_LATENCY_QUERY( self, sock, data ): self.send( HT_LATENCY_REPLY, data, sock ) - def processPR_SYNC_QUERY( self, sock, data ): - self.packer.pack_float(self.nextHeartbeat()) - self.send( HT_SYNC_REPLY, data + self.packer.get_buffer(), sock ) - self.packer.reset() - - def registerHeartbeat( self, handler ): - self.nextHeartbeat = handler - - #-- PEER handlers ------------------------------------------------------ + #-- PEER handlers ------------------------------------------------------ def processHT_LATENCY_REPLY( self, sock, data ): t = time.time() latency = t - self.latencyQueryStart[data] @@ -489,15 +496,4 @@ class Network: self.latencyQueryStart.pop(data) #self.queryLatency() - def processHT_SYNC_REPLY( self, sock, data ): - t = time.time() - hash = data[0:4] - latency = t - self.latencyQueryStart[hash] - #print "got sync reply %d" % (latency*1000) - self.unpacker.reset(data[4:8]) - self.latencyQueryHandler[hash]( latency, self.unpacker.unpack_float() ) - self.latencyQueryHandler.pop(hash) - self.latencyQueryStart.pop(hash) - - diff --git a/miniTamTam/miniTamTamMain.py b/miniTamTam/miniTamTamMain.py index 8a4180d..c14435e 100644 --- a/miniTamTam/miniTamTamMain.py +++ b/miniTamTam/miniTamTamMain.py @@ -5,11 +5,14 @@ import gobject import os import random import time +import xdrlib + from types import * from math import sqrt from Util.NoteDB import PARAMETER import Util.Network +Net = Util.Network # convinience assignment import Config @@ -37,9 +40,6 @@ class miniTamTamMain(SubActivity): def __init__(self, activity, set_mode): SubActivity.__init__(self, set_mode) - self.network = Util.Network.Network() - self.heartbeatStart = time.time() - self.network.registerHeartbeat( self.nextHeartbeat ) self.set_border_width(Config.MAIN_WINDOW_PADDING) @@ -92,6 +92,17 @@ class miniTamTamMain(SubActivity): self.synthLabWindow = None + self.heartbeatStart = time.time() + self.syncQueryStart = {} + + self.network = Net.Network() + self.network.connectMessage( Net.HT_SYNC_REPLY, self.processHT_SYNC_REPLY ) + self.network.connectMessage( Net.PR_SYNC_QUERY, self.processPR_SYNC_QUERY ) + + # data packing classes + self.packer = xdrlib.Packer() + self.unpacker = xdrlib.Unpacker("") + self.updateSync() self.syncTimeout = gobject.timeout_add( 1000, self.updateSync ) @@ -353,6 +364,9 @@ class miniTamTamMain(SubActivity): self.csnd.loopSetTempo(self.tempo) self.sequencer.tempo = widget.get_adjustment().value self.drumFillin.setTempo(self.tempo) + + if self.network.isHost(): + self.network.sendUpdateTempo( self.tempo ) img = int(self.scale( self.tempo, Config.PLAYER_TEMPO_LOWER,Config.PLAYER_TEMPO_UPPER, @@ -506,6 +520,41 @@ class miniTamTamMain(SubActivity): else: return result + + #----------------------------------------------------------------------- + # Network + + #-- Senders ------------------------------------------------------------ + + def querySync( self ): + self.packer.pack_float(random.random()) + hash = self.packer.get_buffer() + self.packer.reset() + self.syncQueryStart[hash] = time.time() + self.network.send( Net.PR_SYNC_QUERY, hash) + + #-- Handlers ----------------------------------------------------------- + + def processHT_SYNC_REPLY( self, sock, message, data ): + t = time.time() + hash = data[0:4] + latency = t - self.syncQueryStart[hash] + self.unpacker.reset(data[4:8]) + nextBeat = self.unpacker.unpack_float() + #print "mini:: got sync: next beat in %f, latency %d" % (nextBeat, latency*1000) + self.heartbeatStart = t + nextBeat - self.beatDuration - latency/2 + self.correctSync() + self.syncQueryStart.pop(hash) + + def processPR_SYNC_QUERY( self, sock, message, data ): + self.packer.pack_float(self.nextHeartbeat()) + self.network.send( Net.HT_SYNC_REPLY, data + self.packer.get_buffer(), sock ) + self.packer.reset() + + + #----------------------------------------------------------------------- + # Sync + def nextHeartbeat( self ): delta = time.time() - self.heartbeatStart return self.beatDuration - (delta % self.beatDuration) @@ -529,14 +578,9 @@ class miniTamTamMain(SubActivity): elif self.network.isHost(): self.correctSync() else: - self.network.querySync( self.handleSync ) + self.querySync() return True - def handleSync( self, latency, nextBeat ): - #print "mini:: got sync: next beat in %f, latency %d" % (nextBeat, latency*1000) - self.heartbeatStart = time.time() + nextBeat - self.beatDuration - latency/2 - self.correctSync() - def correctSync( self ): curTick = self.csnd.loopGetTick() curTicksIn = curTick % Config.TICKS_PER_BEAT |