Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/common/Util/Network.py
diff options
context:
space:
mode:
authorNat <natcl@hotmail.com>2007-09-13 15:55:52 (GMT)
committer Nat <natcl@hotmail.com>2007-09-13 15:55:52 (GMT)
commite12dbff4dda5aafbaac98f75f0467ef00dc06c32 (patch)
tree52f74f5a699ca1a2827b333e76a7225b7d768256 /common/Util/Network.py
parentb94ccdfd2329ed2d1128a4392e2f67b1e6b704da (diff)
Activity split
Diffstat (limited to 'common/Util/Network.py')
-rw-r--r--common/Util/Network.py569
1 files changed, 569 insertions, 0 deletions
diff --git a/common/Util/Network.py b/common/Util/Network.py
new file mode 100644
index 0000000..8e20a04
--- /dev/null
+++ b/common/Util/Network.py
@@ -0,0 +1,569 @@
+
+#===========================================================================
+# Networking Module
+#
+# - to force host mode create an empty file named "FORCE_HOST" in the base
+# TamTam directory
+# - to force peer mode create a file named "FORCE_PEER" with a single line
+# containing the IP of the host to connect to
+#
+# !! the host must be running before the peers start up !!
+#---------------------------------------------------------------------------
+
+import os
+import socket
+import select
+import threading
+import xdrlib
+import random
+
+import time
+import gtk
+import gobject
+import common.Config as Config
+
+PORT = 24420
+LISTENER_PORT = PORT-1
+WAIT_PORT = PORT-2
+
+BACKLOG = 5 # allow a backlog of N new connections
+MAX_SIZE = 1024 # max message size to receive in one go
+
+MD_OFFLINE = 0
+MD_HOST = 1
+MD_PEER = 2
+MD_WAIT = 3
+
+# enumerate message types
+# format: ("NAME", <message size>)
+# <message size> specified in bytes
+# special:
+# -1 == dynamic, first byte of data containes size
+# -2 == dynamic, first uint32 of data contains size
+message_enum = [
+("HT_LATENCY_REPLY", 4), # reply to latency test
+("HT_SYNC_REPLY", 8), # reply to sync test
+("HT_TEMPO_UPDATE", 4), # reply to sync test
+
+("PR_LATENCY_QUERY", 4), # test latency
+("PR_SYNC_QUERY", 4), # test sync
+("PR_TEMPO_QUERY", 0), # test sync
+("PR_REQUEST_TEMPO_CHANGE", 4), # request tempo change
+
+("MAX_MSG_ID", 0)
+]
+
+# Initialize message ids and MSG_NAME/MSG_SIZE arrays
+MSG_NAME = [""]
+MSG_SIZE = [0]
+i = 1
+for m in message_enum:
+ exec "%s = %d" % (m[0],i)
+ MSG_NAME.append(m[0])
+ MSG_SIZE.append(m[1])
+ i += 1
+del message_enum # clear memory
+if MAX_MSG_ID > 256:
+ print "Network:: MAX_MSG_ID exeeds limit of 256!"
+
+
+class Listener( threading.Thread ):
+
+ def __init__( self, owner, listenerSocket, inputSockets, outputSockets, exceptSockets ):
+ threading.Thread.__init__(self)
+ self.owner = owner
+ self.listenerSocket = listenerSocket
+ self.inputSockets = inputSockets # note that these are array pointers that match
+ self.outputSockets = outputSockets # those of the Network and should not be reset
+ self.exceptSockets = exceptSockets #
+
+ def run(self):
+ while 1: # rely on the owner to kill us when necessary
+ try:
+ inputReady, outputReady, exceptReady = select.select( self.inputSockets, self.outputSockets, self.exceptSockets )
+ if self.listenerSocket in inputReady:
+ data, s = self.listenerSocket.recvfrom(MAX_SIZE)
+ if data == "REFRESH":
+ continue
+ if data == "CLEAR":
+ self.owner._clearSockets()
+ continue
+ else:
+ break # exit thread
+ gtk.gdk.threads_enter()
+ self.owner._processSockets( inputReady, outputReady, exceptReady )
+ gtk.gdk.threads_leave()
+ except socket.error, (value, message):
+ print "Listener:: socket error: " + message
+ gtk.gdk.threads_leave()
+ break
+
+class Connection:
+
+ def __init__( self, sock, address ):
+ self.socket = sock
+ self.address = address
+ self.recvBuf = ""
+ self.waitingForData = 0
+ self.message = 0
+
+class Network:
+
+ def __init__( self, mode = MD_OFFLINE, hostaddress = None ):
+
+ # check for forced networking
+ if os.path.isfile("FORCE_HOST"):
+ mode = MD_HOST
+ elif os.path.isfile("FORCE_PEER"):
+ f = open("FORCE_PEER")
+ l = f.read(16)
+ print l
+ f.close()
+ mode = MD_PEER
+ hostaddress = (l,PORT)
+
+ # prepare message handlers
+ self.processMessage = {}
+ for i in range(1,MAX_MSG_ID):
+ self.processMessage[i] = []
+
+ self.statusWatcher = []
+
+ # data packing classes
+ self.packer = xdrlib.Packer()
+ self.unpacker = xdrlib.Unpacker("")
+
+ self.mode = -1
+ self.listener = None
+ self._fromListener = False
+ try:
+ self.listenerSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
+ self.listenerSocket.bind( ("localhost", LISTENER_PORT) )
+ except socket.error, (value,message):
+ print "Network:: FAILED to open listenerSocket: " + message
+ mode = MD_OFFLINE
+
+ self.inputSockets = [ self.listenerSocket ] # NOTE that these array pointers are passed into
+ self.outputSockets = [] # the Listener and should not be reset
+ self.exceptSockets = [] #
+ self.connection = {} # dict of connections indexed by socket
+
+ self.latencyQueryHandler = {}
+ self.latencyQueryStart = {}
+
+ self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY )
+ self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY )
+
+ self.setMode( mode, hostaddress )
+
+ def shutdown( self ):
+ if Config.DEBUG > 1: print "Network:: shutting down!"
+
+ if self.listener:
+ self.listenerSocket.sendto( "EXIT", ("localhost",LISTENER_PORT) )
+ time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS?
+
+ if self.mode == MD_HOST:
+ for s in self.inputSockets:
+ s.close()
+ elif self.mode == MD_PEER:
+ self.socket.close()
+ self.hostAddress = None
+
+ def setMode( self, mode, hostaddress = None ):
+
+ # cleanup old mode
+ if Config.DEBUG > 1: print "Network:: cleaning up old connections"
+
+ if self._fromListener:
+ self._clearSockets()
+ elif self.listener: # make the listener wake so sockets can close properly
+ self.listenerSocket.sendto( "CLEAR", ("localhost",LISTENER_PORT) )
+ time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS?
+
+ self.hostAddress = None
+
+ # initialize new mode
+ self.mode = mode
+ if self.mode == MD_HOST:
+ if Config.DEBUG > 1: print "Network:: initializing network, host mode"
+ try:
+ self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+ address = ("",PORT)
+ self.connection[self.socket] = Connection( self.socket, address )
+ self.socket.bind(address)
+ self.socket.listen(BACKLOG)
+ self.inputSockets.append(self.socket)
+ if not self._fromListener and self.listener:
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ elif not self.listener:
+ self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
+ self.listener.start()
+ except socket.error, (value, message):
+ if self.socket:
+ self.socket.close()
+ self.connection.pop(self.socket)
+ print "Network:: FAILED to open socket: " + message
+ self.mode = MD_OFFLINE
+ if self.listener:
+ self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
+ self.listener = None
+
+ elif self.mode == MD_PEER:
+ if Config.DEBUG > 1: print "Network:: initializing network, client mode: " + hostaddress[0]
+ self.hostAddress = hostaddress
+ try:
+ self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+ self.connection[self.socket] = Connection( self.socket, self.hostAddress )
+ self.socket.connect(self.hostAddress)
+ self.inputSockets.append(self.socket)
+ if not self._fromListener and self.listener:
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ elif not self.listener:
+ self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
+ self.listener.start()
+ except socket.error, (value, message):
+ if self.socket:
+ self.socket.close()
+ self.connection.pop(self.socket)
+ print "Network:: FAILED to open socket: " + message
+ self.mode = MD_OFFLINE
+ self.hostAddress = None
+ if self.listener:
+ self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
+ self.listener = None
+
+ elif self.mode == MD_WAIT:
+ if Config.DEBUG > 1: print "Network:: initializing network, wait mode"
+ try:
+ self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+ address = ("",WAIT_PORT)
+ self.connection[self.socket] = Connection( self.socket, address )
+ self.socket.bind(address)
+ self.socket.listen(BACKLOG)
+ self.inputSockets.append(self.socket)
+ if not self._fromListener and self.listener:
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ elif not self.listener:
+ self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets )
+ self.listener.start()
+ except socket.error, (value, message):
+ if self.socket:
+ self.socket.close()
+ self.connection.pop(self.socket)
+ print "Network:: FAILED to open socket: " + message
+ self.mode = MD_OFFLINE
+ if self.listener:
+ self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
+ self.listener = None
+
+ else:
+ if Config.DEBUG > 1: print "Network:: offline"
+ if self.listener:
+ self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) )
+ self.listener = None
+
+ for watcher in self.statusWatcher:
+ watcher( self.mode )
+
+ def _clearSockets( self ):
+ for s in self.inputSockets:
+ if s != self.listenerSocket:
+ self.inputSockets.remove(s)
+ self.connection.pop(s)
+ s.close()
+ for s in self.outputSockets:
+ self.outputSockets.remove(s)
+ s.close()
+ for s in self.exceptSockets:
+ self.exceptSockets.remove(s)
+ s.close()
+
+
+ def introducePeer( self, ip ):
+ if Config.DEBUG > 1: print "Network:: introducing self to peer " + ip
+ try:
+ poke = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+ poke.setblocking(0)
+ except socket.error, (value, message):
+ print "Network::introducePeer:: FAILED to open socket: " + message
+ return
+ if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect
+ gobject.timeout_add( 500, self._pokePeer, poke, ip, 0 )
+ else: # connected
+ if Config.DEBUG > 1: print "Netwtork:: introduction succeeded"
+ poke.close()
+
+ def _pokePeer( self, poke, ip, retry ):
+ if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect
+ if retry > 120: # give up
+ print "Network::introducePeer:: peer failed to respond after 60 seconds, giving up!"
+ else:
+ gobject.timeout_add( 500, self._pokePeer, poke, ip, retry+1 )
+ else: # connected
+ if Config.DEBUG > 1: print "Netwtork:: introduction succeeded"
+ poke.close()
+
+ return False
+
+
+ def addPeer( self, peer, address ):
+ if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0]
+ self.connection[peer] = Connection( peer, address )
+ self.inputSockets.append( peer )
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+
+ def removePeer( self, peer ):
+ if Config.DEBUG > 1: print "Network:: removing peer: %s" % self.connection[peer].address[0]
+ self.connection.pop(peer)
+ self.inputSockets.remove(peer)
+ self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) )
+ #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets )
+
+ # register a status watcher, format: func( self, status, args )
+ def addWatcher( self, func ):
+ self.statusWatcher.append( func )
+
+ def removeWatcher( self, func ):
+ self.statusWatcher.remove( func )
+
+ # connect a message handler, format: func( self, sock, message, data )
+ def connectMessage( self, message, func ):
+ self.processMessage[message].append(func)
+
+ def connectMessageAfter( self, message, func, after ):
+ try:
+ ind = self.processMessage[message].index(after)
+ self.processMessage[message].insert(ind+1,func)
+ except:
+ print "Network::connectMessageAfter:: function not registered: " + str(after)
+
+ def connectMessageBefore( self, message, func, before ):
+ try:
+ ind = self.processMessage[message].index(before)
+ self.processMessage[message].insert(ind,func)
+ except:
+ print "Network::connectMessageBefore:: function not registered: " + str(before)
+
+ def disconnectMessage( self, message, func ):
+ try:
+ self.processMessage[message].remove(func)
+ except:
+ print "Network::disconnectMessage:: function not registered: " + str(func)
+
+ def isOffline( self ):
+ if self.mode == MD_OFFLINE: return True
+ return False
+
+ def isOnline( self ):
+ if self.mode != MD_OFFLINE: return True
+ return False
+
+ def isHost( self ):
+ if self.mode == MD_HOST: return True
+ return False
+
+ def isPeer( self ):
+ if self.mode == MD_PEER: return True
+ return False
+
+ def isWaiting( self ):
+ if self.mode == MD_WAIT: return True
+ return False
+
+
+ #-----------------------------------------------------------------------
+ # Message Senders
+
+ # basic send function
+ # - message type will be automatically inserted before the data
+ # - message size will be automatically inserted if applicable
+ # - to is only defined in HOST mode
+ def send( self, message, data = "", to = None ):
+ if self.mode == MD_OFFLINE:
+ return
+
+ length = len(data)
+ size = MSG_SIZE[message]
+
+ if size >= 0:
+ if length != size:
+ print "Network:: message wrong length! Got %d expected %d: %s" % (len(data), MSG_SIZE[message], MSG_NAME[message])
+ return
+ msg = chr(message) + data
+ elif size == -1:
+ if length > 255:
+ print "Network:: oversized message! Got %d, max size 255: %s" % (length, MSG_NAME[message])
+ return
+ msg = chr(message) + chr(length) + data
+ else: # size == -2
+ self.packer.pack_uint(size)
+ msg = chr(message) + self.packer.get_buffer() + data
+ self.packer.reset()
+
+ if self.mode == MD_PEER:
+ try:
+ self.socket.send( msg )
+ #print "Network:: sent %d bytes: %s" % (len(msg),msg)
+ except socket.error, (value, errmsg):
+ print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.hostAddress[0], errmsg)
+ # TODO something intelligent
+ else: # MD_HOST
+ try:
+ to.send( msg )
+ #print "Network:: sent %d bytes: %s" % (len(msg),msg)
+ except socket.error, (value, errmsg):
+ print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg)
+ # TODO something intelligent
+
+
+ def sendAll( self, message, data = "" ):
+ if self.mode != MD_HOST:
+ return
+
+ length = len(data)
+ size = MSG_SIZE[message]
+
+ if size >= 0:
+ if length != size:
+ print "Network:: message wrong length! Got %d expected %d: %s" % (MSG_SIZE[message], len(data), MSG_NAME[message])
+ return
+ msg = chr(message) + data
+ elif size == -1:
+ if length > 255:
+ print "Network:: oversized message! Size %d, max size 255: %s" % (length, MSG_NAME[message])
+ return
+ msg = chr(message) + chr(length) + data
+ else: # size == -2
+ self.packer.pack_uint(size)
+ msg = chr(message) + self.packer.get_buffer() + data
+ self.packer.reset()
+
+ for sock in self.connection:
+ if sock == self.socket:
+ continue
+ try:
+ sock.send( msg )
+ except socket.error, (value, errmsg):
+ print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[sock].address[0], errmsg)
+ # TODO something intelligent
+
+ def sendLatencyQuery( self, handler ):
+ if self.mode != MD_PEER:
+ return
+
+ self.packer.pack_float(random.random())
+ hash = self.packer.get_buffer()
+ self.packer.reset()
+ self.latencyQueryHandler[hash] = handler
+ self.latencyQueryStart[hash] = time.time()
+ self.send(PR_LATENCY_QUERY,hash)
+
+ #-----------------------------------------------------------------------
+ # Message Handlers
+
+ def _processSockets( self, inputReady, outputReady, exceptReady ):
+
+ self._fromListener = True
+
+ if self.mode == MD_HOST:
+
+ for s in inputReady:
+ if s == self.socket:
+ # accept new connections
+ try:
+ peer, address = self.socket.accept()
+ self.addPeer( peer, address )
+ except socket.error, (value, message):
+ print "Network:: error accepting connection: " + message
+
+ else:
+ try:
+ data = s.recv(MAX_SIZE)
+ #print "Network:: recv %d bytes: %s" % (len(data), data)
+ if not len(data): # no data to read, socket must be closed
+ self.removePeer(s)
+ else:
+ self.processStream( s, data )
+ except socket.error, (value, message):
+ print "Network:: error reading data: " + message
+
+ elif self.mode == MD_PEER:
+
+ for s in inputReady:
+ try:
+ data = s.recv(MAX_SIZE)
+ if not len(data): # no data to read, socket must be closed
+ self.setMode( MD_OFFLINE )
+ else:
+ #print "Network:: recv %d bytes: %s" % (len(data), data)
+ self.processStream( s, data )
+ except socket.error, (value, message):
+ print "Network:: error reading data: " + message
+
+ else: # MD_WAIT
+
+ for s in inputReady:
+ try:
+ peer, address = self.socket.accept()
+ self.setMode( MD_PEER, (address[0], PORT) )
+ except socket.error, (value, message):
+ print "Network:: error accepting connection: " + message
+
+ self._fromListener = False
+
+
+ def processStream( self, sock, newData = "" ):
+ con = self.connection[sock]
+ con.recvBuf += newData
+
+ if con.waitingForData == -1: # message size in char
+ con.waitingForData = ord(con.recvBuf[0])
+ con.recvBuf = con.recvBuf[1:]
+
+ elif con.waitingForData == -2: # message size in uint
+ if len(con.recvBuf) >= 4:
+ self.unpacker.reset(con.recvBuf[0:4])
+ con.waitingForData = self.unpacker.unpack_uint()
+ con.recvBuf = con.recvBuf[4:]
+ else:
+ return # wait for more data
+
+ elif con.waitingForData:
+ if len(con.recvBuf) >= con.waitingForData:
+ data = con.recvBuf[0:con.waitingForData]
+ con.recvBuf = con.recvBuf[con.waitingForData:]
+ con.waitingForData = 0
+ for func in self.processMessage[con.message]:
+ gobject.idle_add( func, sock, con.message, data )
+ else:
+ return # wait for more data
+
+ else:
+ con.message = ord(con.recvBuf[0])
+ if MSG_SIZE[con.message] == 0:
+ con.recvBuf = con.recvBuf[1:]
+ for func in self.processMessage[con.message]:
+ gobject.idle_add( func, sock, con.message, "" )
+ else:
+ con.waitingForData = MSG_SIZE[con.message]
+ con.recvBuf = con.recvBuf[1:]
+
+ if len(con.recvBuf):
+ self.processStream( sock )
+
+ #-- HOST handlers ------------------------------------------------------
+ def processPR_LATENCY_QUERY( self, sock, message, data ):
+ self.send( HT_LATENCY_REPLY, data, sock )
+
+ #-- PEER handlers ------------------------------------------------------
+ def processHT_LATENCY_REPLY( self, sock, message, data ):
+ t = time.time()
+ latency = t - self.latencyQueryStart[data]
+ #print "got latency reply %d" % (latency*1000)
+ self.latencyQueryHandler[data]( latency )
+ self.latencyQueryHandler.pop(data)
+ self.latencyQueryStart.pop(data)
+
+