diff options
Diffstat (limited to 'TamTamSynthLab.activity/common/Util/Network.py')
-rw-r--r-- | TamTamSynthLab.activity/common/Util/Network.py | 572 |
1 files changed, 572 insertions, 0 deletions
diff --git a/TamTamSynthLab.activity/common/Util/Network.py b/TamTamSynthLab.activity/common/Util/Network.py new file mode 100644 index 0000000..79fbab5 --- /dev/null +++ b/TamTamSynthLab.activity/common/Util/Network.py @@ -0,0 +1,572 @@ + +#=========================================================================== +# Networking Module +# +# - to force host mode create an empty file named "FORCE_HOST" in the base +# TamTam directory +# - to force peer mode create a file named "FORCE_PEER" with a single line +# containing the IP of the host to connect to +# +# !! the host must be running before the peers start up !! +#--------------------------------------------------------------------------- + +import os +import socket +import select +import threading +import xdrlib +import random + +import time +import gtk +import gobject +import common.Config as Config + +PORT = 24460 +LISTENER_PORT = PORT-1 +WAIT_PORT = PORT-2 + +BACKLOG = 5 # allow a backlog of N new connections +MAX_SIZE = 1024 # max message size to receive in one go + +MD_OFFLINE = 0 +MD_HOST = 1 +MD_PEER = 2 +MD_WAIT = 3 + +# enumerate message types +# format: ("NAME", <message size>) +# <message size> specified in bytes +# special: +# -1 == dynamic, first byte of data containes size +# -2 == dynamic, first uint32 of data contains size +message_enum = [ +("HT_LATENCY_REPLY", 4), # reply to latency test +("HT_SYNC_REPLY", 8), # reply to sync test +("HT_TEMPO_UPDATE", 4), # reply to sync test + +("PR_LATENCY_QUERY", 4), # test latency +("PR_SYNC_QUERY", 4), # test sync +("PR_TEMPO_QUERY", 0), # test sync +("PR_REQUEST_TEMPO_CHANGE", 4), # request tempo change + +("MAX_MSG_ID", 0) +] + +# Initialize message ids and MSG_NAME/MSG_SIZE arrays +MSG_NAME = [""] +MSG_SIZE = [0] +i = 1 +for m in message_enum: + exec "%s = %d" % (m[0],i) + MSG_NAME.append(m[0]) + MSG_SIZE.append(m[1]) + i += 1 +del message_enum # clear memory +if MAX_MSG_ID > 256: + print "Network:: MAX_MSG_ID exeeds limit of 256!" + + +class Listener( threading.Thread ): + + def __init__( self, owner, listenerSocket, inputSockets, outputSockets, exceptSockets ): + threading.Thread.__init__(self) + self.owner = owner + self.listenerSocket = listenerSocket + self.inputSockets = inputSockets # note that these are array pointers that match + self.outputSockets = outputSockets # those of the Network and should not be reset + self.exceptSockets = exceptSockets # + + def run(self): + while 1: # rely on the owner to kill us when necessary + try: + inputReady, outputReady, exceptReady = select.select( self.inputSockets, self.outputSockets, self.exceptSockets, 0.5 ) + if not len( inputReady ): # timeout + continue + if self.listenerSocket in inputReady: + data, s = self.listenerSocket.recvfrom(MAX_SIZE) + if data == "REFRESH": + continue + if data == "CLEAR": + self.owner._clearSockets() + continue + else: + break # exit thread + gtk.gdk.threads_enter() + self.owner._processSockets( inputReady ) + gtk.gdk.threads_leave() + except socket.error, (value, message): + print "Listener:: socket error: " + message + gtk.gdk.threads_leave() + break + +class Connection: + + def __init__( self, sock, address ): + self.socket = sock + self.address = address + self.recvBuf = "" + self.waitingForData = 0 + self.message = 0 + +class Network: + + def __init__( self, mode = MD_OFFLINE, hostaddress = None ): + + # check for forced networking + if os.path.isfile("FORCE_HOST"): + mode = MD_HOST + elif os.path.isfile("FORCE_PEER"): + f = open("FORCE_PEER") + l = f.read(16) + print l + f.close() + mode = MD_PEER + hostaddress = (l,PORT) + + # prepare message handlers + self.processMessage = {} + for i in range(1,MAX_MSG_ID): + self.processMessage[i] = [] + + self.statusWatcher = [] + + # data packing classes + self.packer = xdrlib.Packer() + self.unpacker = xdrlib.Unpacker("") + + self.mode = -1 + self.listener = None + self._fromListener = False + try: + self.listenerSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) + self.listenerSocket.bind( ("localhost", LISTENER_PORT) ) + except socket.error, (value,message): + print "Network:: FAILED to open listenerSocket: " + message + mode = MD_OFFLINE + + self.inputSockets = [ self.listenerSocket ] # NOTE that these array pointers are passed into + self.outputSockets = [] # the Listener and should not be reset + self.exceptSockets = [] # + self.connection = {} # dict of connections indexed by socket + + self.latencyQueryHandler = {} + self.latencyQueryStart = {} + + self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY ) + self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY ) + + self.setMode( mode, hostaddress ) + + def shutdown( self ): + if Config.DEBUG > 1: print "Network:: shutting down!" + + if self.listener: + self.listenerSocket.sendto( "EXIT", ("localhost",LISTENER_PORT) ) + time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS? + self.listener = None + + if self.mode == MD_HOST: + for s in self.inputSockets: + s.close() + elif self.mode == MD_PEER: + self.socket.close() + self.hostAddress = None + + def setMode( self, mode, hostaddress = None ): + + # cleanup old mode + if Config.DEBUG > 1: print "Network:: cleaning up old connections" + + if self._fromListener: + self._clearSockets() + elif self.listener: # make the listener wake so sockets can close properly + self.listenerSocket.sendto( "CLEAR", ("localhost",LISTENER_PORT) ) + time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS? + + self.hostAddress = None + + # initialize new mode + self.mode = mode + if self.mode == MD_HOST: + if Config.DEBUG > 1: print "Network:: initializing network, host mode" + try: + self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) + address = ("",PORT) + self.connection[self.socket] = Connection( self.socket, address ) + self.socket.bind(address) + self.socket.listen(BACKLOG) + self.inputSockets.append(self.socket) + if not self._fromListener and self.listener: + self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) + elif not self.listener: + self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) + self.listener.start() + except socket.error, (value, message): + if self.socket: + self.socket.close() + self.connection.pop(self.socket) + print "Network:: FAILED to open socket: " + message + self.mode = MD_OFFLINE + if self.listener: + self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) + self.listener = None + + elif self.mode == MD_PEER: + if Config.DEBUG > 1: print "Network:: initializing network, client mode: " + hostaddress[0] + self.hostAddress = hostaddress + try: + self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) + self.connection[self.socket] = Connection( self.socket, self.hostAddress ) + self.socket.connect(self.hostAddress) + self.inputSockets.append(self.socket) + if not self._fromListener and self.listener: + self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) + elif not self.listener: + self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) + self.listener.start() + except socket.error, (value, message): + if self.socket: + self.socket.close() + self.connection.pop(self.socket) + print "Network:: FAILED to open socket: " + message + self.mode = MD_OFFLINE + self.hostAddress = None + if self.listener: + self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) + self.listener = None + + elif self.mode == MD_WAIT: + if Config.DEBUG > 1: print "Network:: initializing network, wait mode" + try: + self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) + address = ("",WAIT_PORT) + self.connection[self.socket] = Connection( self.socket, address ) + self.socket.bind(address) + self.socket.listen(BACKLOG) + self.inputSockets.append(self.socket) + if not self._fromListener and self.listener: + self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) + elif not self.listener: + self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) + self.listener.start() + except socket.error, (value, message): + if self.socket: + self.socket.close() + self.connection.pop(self.socket) + print "Network:: FAILED to open socket: " + message + self.mode = MD_OFFLINE + if self.listener: + self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) + self.listener = None + + else: + if Config.DEBUG > 1: print "Network:: offline" + if self.listener: + self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) + self.listener = None + + for watcher in self.statusWatcher: + watcher( self.mode ) + + def _clearSockets( self ): + for s in self.inputSockets: + if s != self.listenerSocket: + self.inputSockets.remove(s) + self.connection.pop(s) + s.close() + for s in self.outputSockets: + self.outputSockets.remove(s) + s.close() + for s in self.exceptSockets: + self.exceptSockets.remove(s) + s.close() + + + def introducePeer( self, ip ): + if Config.DEBUG > 1: print "Network:: introducing self to peer " + ip + try: + poke = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) + poke.setblocking(0) + except socket.error, (value, message): + print "Network::introducePeer:: FAILED to open socket: " + message + return + if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect + gobject.timeout_add( 500, self._pokePeer, poke, ip, 0 ) + else: # connected + if Config.DEBUG > 1: print "Netwtork:: introduction succeeded" + poke.close() + + def _pokePeer( self, poke, ip, retry ): + if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect + if retry > 120: # give up + print "Network::introducePeer:: peer failed to respond after 60 seconds, giving up!" + else: + gobject.timeout_add( 500, self._pokePeer, poke, ip, retry+1 ) + else: # connected + if Config.DEBUG > 1: print "Netwtork:: introduction succeeded" + poke.close() + + return False + + + def addPeer( self, peer, address ): + if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0] + self.connection[peer] = Connection( peer, address ) + self.inputSockets.append( peer ) + self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) + #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) + + def removePeer( self, peer ): + if Config.DEBUG > 1: print "Network:: removing peer: %s" % self.connection[peer].address[0] + self.connection.pop(peer) + self.inputSockets.remove(peer) + self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) + #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) + + # register a status watcher, format: func( self, status, args ) + def addWatcher( self, func ): + self.statusWatcher.append( func ) + + def removeWatcher( self, func ): + self.statusWatcher.remove( func ) + + # connect a message handler, format: func( self, sock, message, data ) + def connectMessage( self, message, func ): + self.processMessage[message].append(func) + + def connectMessageAfter( self, message, func, after ): + try: + ind = self.processMessage[message].index(after) + self.processMessage[message].insert(ind+1,func) + except: + print "Network::connectMessageAfter:: function not registered: " + str(after) + + def connectMessageBefore( self, message, func, before ): + try: + ind = self.processMessage[message].index(before) + self.processMessage[message].insert(ind,func) + except: + print "Network::connectMessageBefore:: function not registered: " + str(before) + + def disconnectMessage( self, message, func ): + try: + self.processMessage[message].remove(func) + except: + print "Network::disconnectMessage:: function not registered: " + str(func) + + def isOffline( self ): + if self.mode == MD_OFFLINE: return True + return False + + def isOnline( self ): + if self.mode != MD_OFFLINE: return True + return False + + def isHost( self ): + if self.mode == MD_HOST: return True + return False + + def isPeer( self ): + if self.mode == MD_PEER: return True + return False + + def isWaiting( self ): + if self.mode == MD_WAIT: return True + return False + + + #----------------------------------------------------------------------- + # Message Senders + + # basic send function + # - message type will be automatically inserted before the data + # - message size will be automatically inserted if applicable + # - to is only defined in HOST mode + def send( self, message, data = "", to = None ): + if self.mode == MD_OFFLINE: + return + + length = len(data) + size = MSG_SIZE[message] + + if size >= 0: + if length != size: + print "Network:: message wrong length! Got %d expected %d: %s" % (len(data), MSG_SIZE[message], MSG_NAME[message]) + return + msg = chr(message) + data + elif size == -1: + if length > 255: + print "Network:: oversized message! Got %d, max size 255: %s" % (length, MSG_NAME[message]) + return + msg = chr(message) + chr(length) + data + else: # size == -2 + self.packer.pack_uint(size) + msg = chr(message) + self.packer.get_buffer() + data + self.packer.reset() + + if self.mode == MD_PEER: + try: + self.socket.send( msg ) + #print "Network:: sent %d bytes" % (len(msg)) + except socket.error, (value, errmsg): + print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.hostAddress[0], errmsg) + # TODO something intelligent + else: # MD_HOST + try: + to.send( msg ) + #print "Network:: sent %d bytes" % (len(msg)) + except socket.error, (value, errmsg): + print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) + # TODO something intelligent + + + def sendAll( self, message, data = "" ): + if self.mode != MD_HOST: + return + + length = len(data) + size = MSG_SIZE[message] + + if size >= 0: + if length != size: + print "Network:: message wrong length! Got %d expected %d: %s" % (MSG_SIZE[message], len(data), MSG_NAME[message]) + return + msg = chr(message) + data + elif size == -1: + if length > 255: + print "Network:: oversized message! Size %d, max size 255: %s" % (length, MSG_NAME[message]) + return + msg = chr(message) + chr(length) + data + else: # size == -2 + self.packer.pack_uint(size) + msg = chr(message) + self.packer.get_buffer() + data + self.packer.reset() + + for sock in self.connection: + if sock == self.socket: + continue + try: + sock.send( msg ) + except socket.error, (value, errmsg): + print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[sock].address[0], errmsg) + # TODO something intelligent + + def sendLatencyQuery( self, handler ): + if self.mode != MD_PEER: + return + + self.packer.pack_float(random.random()) + hash = self.packer.get_buffer() + self.packer.reset() + self.latencyQueryHandler[hash] = handler + self.latencyQueryStart[hash] = time.time() + self.send(PR_LATENCY_QUERY,hash) + + #----------------------------------------------------------------------- + # Message Handlers + + def _processSockets( self, inputReady ): + + self._fromListener = True + + if self.mode == MD_HOST: + + for s in inputReady: + if s == self.socket: + # accept new connections + try: + peer, address = self.socket.accept() + self.addPeer( peer, address ) + except socket.error, (value, message): + print "Network:: error accepting connection: " + message + + else: + try: + data = s.recv(MAX_SIZE) + #print "Network:: recv %d bytes: %s" % (len(data), data) + if not len(data): # no data to read, socket must be closed + self.removePeer(s) + else: + self.processStream( s, data ) + except socket.error, (value, message): + print "Network:: error reading data: " + message + + elif self.mode == MD_PEER: + + for s in inputReady: + try: + data = s.recv(MAX_SIZE) + if not len(data): # no data to read, socket must be closed + self.setMode( MD_OFFLINE ) + else: + #print "Network:: recv %d bytes: %s" % (len(data), data) + self.processStream( s, data ) + except socket.error, (value, message): + print "Network:: error reading data: " + message + + else: # MD_WAIT + + for s in inputReady: + try: + peer, address = self.socket.accept() + self.setMode( MD_PEER, (address[0], PORT) ) + except socket.error, (value, message): + print "Network:: error accepting connection: " + message + + self._fromListener = False + + + def processStream( self, sock, newData = "" ): + con = self.connection[sock] + con.recvBuf += newData + + if con.waitingForData == -1: # message size in char + con.waitingForData = ord(con.recvBuf[0]) + con.recvBuf = con.recvBuf[1:] + + elif con.waitingForData == -2: # message size in uint + if len(con.recvBuf) >= 4: + self.unpacker.reset(con.recvBuf[0:4]) + con.waitingForData = self.unpacker.unpack_uint() + con.recvBuf = con.recvBuf[4:] + else: + return # wait for more data + + elif con.waitingForData: + if len(con.recvBuf) >= con.waitingForData: + data = con.recvBuf[0:con.waitingForData] + con.recvBuf = con.recvBuf[con.waitingForData:] + con.waitingForData = 0 + for func in self.processMessage[con.message]: + gobject.idle_add( func, sock, con.message, data ) + else: + return # wait for more data + + else: + con.message = ord(con.recvBuf[0]) + if MSG_SIZE[con.message] == 0: + con.recvBuf = con.recvBuf[1:] + for func in self.processMessage[con.message]: + gobject.idle_add( func, sock, con.message, "" ) + else: + con.waitingForData = MSG_SIZE[con.message] + con.recvBuf = con.recvBuf[1:] + + if len(con.recvBuf): + self.processStream( sock ) + + #-- HOST handlers ------------------------------------------------------ + def processPR_LATENCY_QUERY( self, sock, message, data ): + self.send( HT_LATENCY_REPLY, data, sock ) + + #-- PEER handlers ------------------------------------------------------ + def processHT_LATENCY_REPLY( self, sock, message, data ): + t = time.time() + latency = t - self.latencyQueryStart[data] + #print "got latency reply %d" % (latency*1000) + self.latencyQueryHandler[data]( latency ) + self.latencyQueryHandler.pop(data) + self.latencyQueryStart.pop(data) + + |