Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoramartin <olpc@xo-02-2E-11.localdomain>2007-07-10 07:23:27 (GMT)
committer amartin <olpc@xo-02-2E-11.localdomain>2007-07-10 07:23:27 (GMT)
commit2be64abfdd7bc82fcf3016e68fba0428e064c6d7 (patch)
tree7f963d0b6b6100b5071555ff5f221657c6fd4040
parentb7c9b0845479870a5eb5eec6712694e29ecc7e79 (diff)
network
-rw-r--r--Util/Network.py110
-rw-r--r--miniTamTam/miniTamTamMain.py28
2 files changed, 76 insertions, 62 deletions
diff --git a/Util/Network.py b/Util/Network.py
index 60dc894..4d69ecd 100644
--- a/Util/Network.py
+++ b/Util/Network.py
@@ -72,14 +72,9 @@ class Listener( threading.Thread ):
threading.Thread.__init__(self)
self.owner = owner
self.listenerSocket = listenerSocket
- self.inputSockets = inputSockets[:]
- self.outputSockets = outputSockets[:]
- self.exceptSockets = exceptSockets[:]
-
- def updateSockets( self, inputSockets, outputSockets, exceptSockets ):
- self.inputSockets = inputSockets[:]
- self.outputSockets = outputSockets[:]
- self.exceptSockets = exceptSockets[:]
+ self.inputSockets = inputSockets # note that these are array pointers that match
+ self.outputSockets = outputSockets # those of the Network and should not be reset
+ self.exceptSockets = exceptSockets #
def run(self):
while 1: # rely on the owner to kill us when necessary
@@ -90,14 +85,12 @@ class Listener( threading.Thread ):
if data == "REFRESH":
continue
if data == "CLEAR":
- self.inputSockets = [ self.listenerSocket ]
- self.outputSockets = []
- self.exceptSockets = []
+ self.owner._clearSockets()
continue
else:
break # exit thread
gtk.gdk.threads_enter()
- self.owner.processSockets( inputReady, outputReady, exceptReady )
+ self.owner._processSockets( inputReady, outputReady, exceptReady )
gtk.gdk.threads_leave()
except socket.error, (value, message):
print "Listener:: socket error: " + message
@@ -141,6 +134,7 @@ class Network:
self.mode = -1
self.listener = None
+ self._fromListener = False
try:
self.listenerSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
self.listenerSocket.bind( ("localhost", LISTENER_PORT) )
@@ -148,9 +142,9 @@ class Network:
print "Network:: FAILED to open listenerSocket: " + message
mode = MD_OFFLINE
- self.inputSockets = [ self.listenerSocket ]
- self.outputSockets = []
- self.exceptSockets = []
+ self.inputSockets = [ self.listenerSocket ] # NOTE that these array pointers are passed into
+ self.outputSockets = [] # the Listener and should not be reset
+ self.exceptSockets = [] #
self.connection = {} # dict of connections indexed by socket
self.latencyQueryHandler = {}
@@ -177,37 +171,16 @@ class Network:
def setMode( self, mode, hostaddress = None ):
- if self.listener: # clear the listener so sockets can close properly
- self.listenerSocket.sendto( "CLEAR", ("localhost",LISTENER_PORT) )
- time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS?
-
# cleanup old mode
- if self.mode == MD_HOST:
- if Config.DEBUG > 1: print "Network:: host - cleaning up old connections"
- for s in self.inputSockets:
- if s != self.listenerSocket:
- s.close()
- self.connection.pop(s)
- self.socket.close()
- self.connection.pop(self.socket)
- self.socket = None
- self.inputSockets = [ self.listenerSocket ]
+ if Config.DEBUG > 1: print "Network:: cleaning up old connections"
- elif self.mode == MD_PEER:
- if Config.DEBUG > 1: print "Network:: peer - cleaning up old connections"
- self.socket.close()
- self.connection.pop(self.socket)
- self.socket = None
- self.hostAddress = None
- self.inputSockets = [ self.listenerSocket ]
-
- elif self.mode == MD_WAIT:
- if Config.DEBUG > 1: print "Network:: wait - cleaning up old connections"
- self.socket.close()
- self.connection.pop(self.socket)
- self.socket = None
- self.inputSockets = [ self.listenerSocket ]
+ if self._fromListener:
+ self._clearSockets()
+ elif self.listener: # make the listener wake so sockets can close properly
+ self.listenerSocket.sendto( "CLEAR", ("localhost",LISTENER_PORT) )
+ time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS?
+ self.hostAddress = None
# initialize new mode
self.mode = mode
@@ -218,13 +191,11 @@ class Network:
address = ("",PORT)
self.connection[self.socket] = Connection( self.socket, address )
self.socket.bind(address)
-# self.socket.setblocking(0)
self.socket.listen(BACKLOG)
self.inputSockets.append(self.socket)
- if self.listener:
- self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ if not self._fromListener and self.listener:
self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
- else:
+ elif not self.listener:
self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
self.listener.start()
except socket.error, (value, message):
@@ -243,13 +214,11 @@ class Network:
try:
self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.connection[self.socket] = Connection( self.socket, self.hostAddress )
- # self.socket.setblocking(0)
self.socket.connect(self.hostAddress)
self.inputSockets.append(self.socket)
- if self.listener:
- self.listener.updateSockets( self.inputsSockets, self.outputSockets, self.exceptSockets )
+ if not self._fromListener and self.listener:
self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
- else:
+ elif not self.listener:
self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
self.listener.start()
except socket.error, (value, message):
@@ -272,10 +241,9 @@ class Network:
self.socket.bind(address)
self.socket.listen(BACKLOG)
self.inputSockets.append(self.socket)
- if self.listener:
- self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ if not self._fromListener and self.listener:
self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
- else:
+ elif not self.listener:
self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
self.listener.start()
except socket.error, (value, message):
@@ -294,6 +262,23 @@ class Network:
self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
self.listener = None
+ for watcher in self.statusWatcher:
+ watcher( self.mode )
+
+ def _clearSockets( self ):
+ for s in self.inputSockets:
+ if s != self.listenerSocket:
+ self.inputSockets.remove(s)
+ self.connection.pop(s)
+ s.close()
+ for s in self.outputSockets:
+ self.outputSockets.remove(s)
+ s.close()
+ for s in self.exceptSockets:
+ self.exceptSockets.remove(s)
+ s.close()
+
+
def introducePeer( self, ip ):
if Config.DEBUG > 1: print "Network:: introducing self to peer " + ip
try:
@@ -325,13 +310,15 @@ class Network:
if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0]
self.connection[peer] = Connection( peer, address )
self.inputSockets.append( peer )
- self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
def removePeer( self, peer ):
if Config.DEBUG > 1: print "Network:: removing peer: %s" % self.connection[peer].address[0]
self.connection.pop(peer)
self.inputSockets.remove(peer)
- self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
# register a status watcher, format: func( self, status, args )
def addWatcher( self, func ):
@@ -380,6 +367,11 @@ class Network:
if self.mode == MD_PEER: return True
return False
+ def isWaiting( self ):
+ if self.mode == MD_WAIT: return True
+ return False
+
+
#-----------------------------------------------------------------------
# Message Senders
@@ -470,7 +462,9 @@ class Network:
#-----------------------------------------------------------------------
# Message Handlers
- def processSockets( self, inputReady, outputReady, exceptReady ):
+ def _processSockets( self, inputReady, outputReady, exceptReady ):
+
+ self._fromListener = True
if self.mode == MD_HOST:
@@ -516,6 +510,8 @@ class Network:
except socket.error, (value, message):
print "Network:: error accepting connection: " + message
+ self._fromListener = False
+
def processStream( self, sock, newData = "" ):
con = self.connection[sock]
diff --git a/miniTamTam/miniTamTamMain.py b/miniTamTam/miniTamTamMain.py
index 588cd08..191773c 100644
--- a/miniTamTam/miniTamTamMain.py
+++ b/miniTamTam/miniTamTamMain.py
@@ -100,8 +100,10 @@ class miniTamTamMain(SubActivity):
self.heartbeatStart = time.time()
self.syncQueryStart = {}
+ self.syncTimeout = None
self.network = Net.Network()
+ self.network.addWatcher( self.networkStatusWatcher )
self.network.connectMessage( Net.HT_SYNC_REPLY, self.processHT_SYNC_REPLY )
self.network.connectMessage( Net.HT_TEMPO_UPDATE, self.processHT_TEMPO_UPDATE )
self.network.connectMessage( Net.PR_SYNC_QUERY, self.processPR_SYNC_QUERY )
@@ -123,14 +125,14 @@ class miniTamTamMain(SubActivity):
if os.path.isfile("FORCE_SHARE"): # HOST
r = random.random()
- print "::::: Sharing as TamTam%f :::::" % r
- self.activity.set_title(_gettext("TamTam%f" % r))
+ print "::::: Sharing as TTDBG%f :::::" % r
+ self.activity.set_title(_gettext("TTDBG%f" % r))
self.activity.connect( "shared", self.shared )
self.activity.share()
elif self.activity._shared_activity: # PEER
self.activity._shared_activity.connect( "buddy-joined", self.buddy_joined )
self.activity._shared_activity.connect( "buddy-left", self.buddy_left )
- self.activity.connect( "joined", self.joinedTEMP )
+ self.activity.connect( "joined", self.joined )
self.network.setMode( Net.MD_WAIT )
def drawSliders( self ):
@@ -574,8 +576,10 @@ class miniTamTamMain(SubActivity):
self.activity._shared_activity.connect( "buddy-joined", self.buddy_joined )
self.activity._shared_activity.connect( "buddy-left", self.buddy_left )
self.network.setMode( Net.MD_HOST )
+ self.updateSync()
+ self.syncTimeout = gobject.timeout_add( 1000, self.updateSync )
- def joinedTEMP( self, activity ):
+ def joined( self, activity ):
print "miniTamTam:: joined activity!!"
for buddy in self.activity._shared_activity.get_joined_buddies():
print buddy.props.ip4_address
@@ -615,6 +619,17 @@ class miniTamTamMain(SubActivity):
#-- Handlers -----------------------------------------------------------
+ def networkStatusWatcher( self, mode ):
+ if mode == Net.MD_OFFLINE:
+ if self.syncTimeout:
+ gobject.source_remove( self.syncTimeout )
+ self.syncTimeout = None
+ if mode == Net.MD_PEER:
+ self.updateSync()
+ if not self.syncTimeout:
+ self.syncTimeout = gobject.timeout_add( 1000, self.updateSync )
+ self.sendTempoQuery()
+
def processHT_SYNC_REPLY( self, sock, message, data ):
t = time.time()
hash = data[0:4]
@@ -627,6 +642,7 @@ class miniTamTamMain(SubActivity):
self.syncQueryStart.pop(hash)
def processHT_TEMPO_UPDATE( self, sock, message, data ):
+ print "got tempo update"
self.unpacker.reset(data)
self.tempoAdjustment.set_value( self.unpacker.unpack_int() )
self.sendSyncQuery()
@@ -664,6 +680,8 @@ class miniTamTamMain(SubActivity):
def updateSync( self ):
if self.network.isOffline():
return False
+ elif self.network.isWaiting():
+ return True
elif self.network.isHost():
self.correctSync()
else:
@@ -685,7 +703,7 @@ class miniTamTamMain(SubActivity):
correct -= ticksPerLoop
elif correct < 0:
correct += ticksPerLoop
- print "correct:: %f ticks, %f ticks in, %f expected, %f err, correct %f" % (curTick, curTicksIn, ticksIn, err, correct)
+ #print "correct:: %f ticks, %f ticks in, %f expected, %f err, correct %f" % (curTick, curTicksIn, ticksIn, err, correct)
#if correct != curTick:
#self.csnd.loopSetTick(correct)
if abs(err) > 0.25: