diff options
author | Aleksey Lim <alsroot@activitycentral.org> | 2011-02-28 16:13:13 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@activitycentral.org> | 2011-02-28 16:13:13 (GMT) |
commit | a0705d8ff9b25c1172e38925ec27bb28f9e5a1e9 (patch) | |
tree | d3a20dff1d1e8d196fab33530fb21dd470addef5 /TamTamMini.activity/common/Util/Network.py | |
parent | 0c8e687ce284d7599b9bfb7c578b0fc7fb32c493 (diff) |
Revert "fixing simlimking build error"
This reverts commit 0c8e687ce284d7599b9bfb7c578b0fc7fb32c493.
Since common/ directory will be copied to .xo in setup.py anyway.
Diffstat (limited to 'TamTamMini.activity/common/Util/Network.py')
-rw-r--r-- | TamTamMini.activity/common/Util/Network.py | 572 |
1 files changed, 0 insertions, 572 deletions
diff --git a/TamTamMini.activity/common/Util/Network.py b/TamTamMini.activity/common/Util/Network.py deleted file mode 100644 index 79fbab5..0000000 --- a/TamTamMini.activity/common/Util/Network.py +++ /dev/null @@ -1,572 +0,0 @@ - -#=========================================================================== -# Networking Module -# -# - to force host mode create an empty file named "FORCE_HOST" in the base -# TamTam directory -# - to force peer mode create a file named "FORCE_PEER" with a single line -# containing the IP of the host to connect to -# -# !! the host must be running before the peers start up !! -#--------------------------------------------------------------------------- - -import os -import socket -import select -import threading -import xdrlib -import random - -import time -import gtk -import gobject -import common.Config as Config - -PORT = 24460 -LISTENER_PORT = PORT-1 -WAIT_PORT = PORT-2 - -BACKLOG = 5 # allow a backlog of N new connections -MAX_SIZE = 1024 # max message size to receive in one go - -MD_OFFLINE = 0 -MD_HOST = 1 -MD_PEER = 2 -MD_WAIT = 3 - -# enumerate message types -# format: ("NAME", <message size>) -# <message size> specified in bytes -# special: -# -1 == dynamic, first byte of data containes size -# -2 == dynamic, first uint32 of data contains size -message_enum = [ -("HT_LATENCY_REPLY", 4), # reply to latency test -("HT_SYNC_REPLY", 8), # reply to sync test -("HT_TEMPO_UPDATE", 4), # reply to sync test - -("PR_LATENCY_QUERY", 4), # test latency -("PR_SYNC_QUERY", 4), # test sync -("PR_TEMPO_QUERY", 0), # test sync -("PR_REQUEST_TEMPO_CHANGE", 4), # request tempo change - -("MAX_MSG_ID", 0) -] - -# Initialize message ids and MSG_NAME/MSG_SIZE arrays -MSG_NAME = [""] -MSG_SIZE = [0] -i = 1 -for m in message_enum: - exec "%s = %d" % (m[0],i) - MSG_NAME.append(m[0]) - MSG_SIZE.append(m[1]) - i += 1 -del message_enum # clear memory -if MAX_MSG_ID > 256: - print "Network:: MAX_MSG_ID exeeds limit of 256!" - - -class Listener( threading.Thread ): - - def __init__( self, owner, listenerSocket, inputSockets, outputSockets, exceptSockets ): - threading.Thread.__init__(self) - self.owner = owner - self.listenerSocket = listenerSocket - self.inputSockets = inputSockets # note that these are array pointers that match - self.outputSockets = outputSockets # those of the Network and should not be reset - self.exceptSockets = exceptSockets # - - def run(self): - while 1: # rely on the owner to kill us when necessary - try: - inputReady, outputReady, exceptReady = select.select( self.inputSockets, self.outputSockets, self.exceptSockets, 0.5 ) - if not len( inputReady ): # timeout - continue - if self.listenerSocket in inputReady: - data, s = self.listenerSocket.recvfrom(MAX_SIZE) - if data == "REFRESH": - continue - if data == "CLEAR": - self.owner._clearSockets() - continue - else: - break # exit thread - gtk.gdk.threads_enter() - self.owner._processSockets( inputReady ) - gtk.gdk.threads_leave() - except socket.error, (value, message): - print "Listener:: socket error: " + message - gtk.gdk.threads_leave() - break - -class Connection: - - def __init__( self, sock, address ): - self.socket = sock - self.address = address - self.recvBuf = "" - self.waitingForData = 0 - self.message = 0 - -class Network: - - def __init__( self, mode = MD_OFFLINE, hostaddress = None ): - - # check for forced networking - if os.path.isfile("FORCE_HOST"): - mode = MD_HOST - elif os.path.isfile("FORCE_PEER"): - f = open("FORCE_PEER") - l = f.read(16) - print l - f.close() - mode = MD_PEER - hostaddress = (l,PORT) - - # prepare message handlers - self.processMessage = {} - for i in range(1,MAX_MSG_ID): - self.processMessage[i] = [] - - self.statusWatcher = [] - - # data packing classes - self.packer = xdrlib.Packer() - self.unpacker = xdrlib.Unpacker("") - - self.mode = -1 - self.listener = None - self._fromListener = False - try: - self.listenerSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) - self.listenerSocket.bind( ("localhost", LISTENER_PORT) ) - except socket.error, (value,message): - print "Network:: FAILED to open listenerSocket: " + message - mode = MD_OFFLINE - - self.inputSockets = [ self.listenerSocket ] # NOTE that these array pointers are passed into - self.outputSockets = [] # the Listener and should not be reset - self.exceptSockets = [] # - self.connection = {} # dict of connections indexed by socket - - self.latencyQueryHandler = {} - self.latencyQueryStart = {} - - self.connectMessage( HT_LATENCY_REPLY, self.processHT_LATENCY_REPLY ) - self.connectMessage( PR_LATENCY_QUERY, self.processPR_LATENCY_QUERY ) - - self.setMode( mode, hostaddress ) - - def shutdown( self ): - if Config.DEBUG > 1: print "Network:: shutting down!" - - if self.listener: - self.listenerSocket.sendto( "EXIT", ("localhost",LISTENER_PORT) ) - time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS? - self.listener = None - - if self.mode == MD_HOST: - for s in self.inputSockets: - s.close() - elif self.mode == MD_PEER: - self.socket.close() - self.hostAddress = None - - def setMode( self, mode, hostaddress = None ): - - # cleanup old mode - if Config.DEBUG > 1: print "Network:: cleaning up old connections" - - if self._fromListener: - self._clearSockets() - elif self.listener: # make the listener wake so sockets can close properly - self.listenerSocket.sendto( "CLEAR", ("localhost",LISTENER_PORT) ) - time.sleep(0.01) # get off the cpu so the listerer thread has a chance to clear.. IS THERE A BETTER WAY TO DO THIS? - - self.hostAddress = None - - # initialize new mode - self.mode = mode - if self.mode == MD_HOST: - if Config.DEBUG > 1: print "Network:: initializing network, host mode" - try: - self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) - address = ("",PORT) - self.connection[self.socket] = Connection( self.socket, address ) - self.socket.bind(address) - self.socket.listen(BACKLOG) - self.inputSockets.append(self.socket) - if not self._fromListener and self.listener: - self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) - elif not self.listener: - self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) - self.listener.start() - except socket.error, (value, message): - if self.socket: - self.socket.close() - self.connection.pop(self.socket) - print "Network:: FAILED to open socket: " + message - self.mode = MD_OFFLINE - if self.listener: - self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) - self.listener = None - - elif self.mode == MD_PEER: - if Config.DEBUG > 1: print "Network:: initializing network, client mode: " + hostaddress[0] - self.hostAddress = hostaddress - try: - self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) - self.connection[self.socket] = Connection( self.socket, self.hostAddress ) - self.socket.connect(self.hostAddress) - self.inputSockets.append(self.socket) - if not self._fromListener and self.listener: - self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) - elif not self.listener: - self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) - self.listener.start() - except socket.error, (value, message): - if self.socket: - self.socket.close() - self.connection.pop(self.socket) - print "Network:: FAILED to open socket: " + message - self.mode = MD_OFFLINE - self.hostAddress = None - if self.listener: - self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) - self.listener = None - - elif self.mode == MD_WAIT: - if Config.DEBUG > 1: print "Network:: initializing network, wait mode" - try: - self.socket = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) - address = ("",WAIT_PORT) - self.connection[self.socket] = Connection( self.socket, address ) - self.socket.bind(address) - self.socket.listen(BACKLOG) - self.inputSockets.append(self.socket) - if not self._fromListener and self.listener: - self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) - elif not self.listener: - self.listener = Listener( self, self.listenerSocket, self.inputSockets, self.outputSockets, self.exceptSockets ) - self.listener.start() - except socket.error, (value, message): - if self.socket: - self.socket.close() - self.connection.pop(self.socket) - print "Network:: FAILED to open socket: " + message - self.mode = MD_OFFLINE - if self.listener: - self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) - self.listener = None - - else: - if Config.DEBUG > 1: print "Network:: offline" - if self.listener: - self.listenerSocket.sendto( "EXIT", ("localhost", LISTENER_PORT) ) - self.listener = None - - for watcher in self.statusWatcher: - watcher( self.mode ) - - def _clearSockets( self ): - for s in self.inputSockets: - if s != self.listenerSocket: - self.inputSockets.remove(s) - self.connection.pop(s) - s.close() - for s in self.outputSockets: - self.outputSockets.remove(s) - s.close() - for s in self.exceptSockets: - self.exceptSockets.remove(s) - s.close() - - - def introducePeer( self, ip ): - if Config.DEBUG > 1: print "Network:: introducing self to peer " + ip - try: - poke = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) - poke.setblocking(0) - except socket.error, (value, message): - print "Network::introducePeer:: FAILED to open socket: " + message - return - if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect - gobject.timeout_add( 500, self._pokePeer, poke, ip, 0 ) - else: # connected - if Config.DEBUG > 1: print "Netwtork:: introduction succeeded" - poke.close() - - def _pokePeer( self, poke, ip, retry ): - if poke.connect_ex( (ip, WAIT_PORT) ): # failed to connect - if retry > 120: # give up - print "Network::introducePeer:: peer failed to respond after 60 seconds, giving up!" - else: - gobject.timeout_add( 500, self._pokePeer, poke, ip, retry+1 ) - else: # connected - if Config.DEBUG > 1: print "Netwtork:: introduction succeeded" - poke.close() - - return False - - - def addPeer( self, peer, address ): - if Config.DEBUG > 1: print "Network:: adding peer: %s" % address[0] - self.connection[peer] = Connection( peer, address ) - self.inputSockets.append( peer ) - self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) - #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) - - def removePeer( self, peer ): - if Config.DEBUG > 1: print "Network:: removing peer: %s" % self.connection[peer].address[0] - self.connection.pop(peer) - self.inputSockets.remove(peer) - self.listenerSocket.sendto( "REFRESH", ("localhost", LISTENER_PORT) ) - #self.listener.updateSockets( self.inputSockets, self.outputSockets, self.exceptSockets ) - - # register a status watcher, format: func( self, status, args ) - def addWatcher( self, func ): - self.statusWatcher.append( func ) - - def removeWatcher( self, func ): - self.statusWatcher.remove( func ) - - # connect a message handler, format: func( self, sock, message, data ) - def connectMessage( self, message, func ): - self.processMessage[message].append(func) - - def connectMessageAfter( self, message, func, after ): - try: - ind = self.processMessage[message].index(after) - self.processMessage[message].insert(ind+1,func) - except: - print "Network::connectMessageAfter:: function not registered: " + str(after) - - def connectMessageBefore( self, message, func, before ): - try: - ind = self.processMessage[message].index(before) - self.processMessage[message].insert(ind,func) - except: - print "Network::connectMessageBefore:: function not registered: " + str(before) - - def disconnectMessage( self, message, func ): - try: - self.processMessage[message].remove(func) - except: - print "Network::disconnectMessage:: function not registered: " + str(func) - - def isOffline( self ): - if self.mode == MD_OFFLINE: return True - return False - - def isOnline( self ): - if self.mode != MD_OFFLINE: return True - return False - - def isHost( self ): - if self.mode == MD_HOST: return True - return False - - def isPeer( self ): - if self.mode == MD_PEER: return True - return False - - def isWaiting( self ): - if self.mode == MD_WAIT: return True - return False - - - #----------------------------------------------------------------------- - # Message Senders - - # basic send function - # - message type will be automatically inserted before the data - # - message size will be automatically inserted if applicable - # - to is only defined in HOST mode - def send( self, message, data = "", to = None ): - if self.mode == MD_OFFLINE: - return - - length = len(data) - size = MSG_SIZE[message] - - if size >= 0: - if length != size: - print "Network:: message wrong length! Got %d expected %d: %s" % (len(data), MSG_SIZE[message], MSG_NAME[message]) - return - msg = chr(message) + data - elif size == -1: - if length > 255: - print "Network:: oversized message! Got %d, max size 255: %s" % (length, MSG_NAME[message]) - return - msg = chr(message) + chr(length) + data - else: # size == -2 - self.packer.pack_uint(size) - msg = chr(message) + self.packer.get_buffer() + data - self.packer.reset() - - if self.mode == MD_PEER: - try: - self.socket.send( msg ) - #print "Network:: sent %d bytes" % (len(msg)) - except socket.error, (value, errmsg): - print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.hostAddress[0], errmsg) - # TODO something intelligent - else: # MD_HOST - try: - to.send( msg ) - #print "Network:: sent %d bytes" % (len(msg)) - except socket.error, (value, errmsg): - print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[to].address[0], errmsg) - # TODO something intelligent - - - def sendAll( self, message, data = "" ): - if self.mode != MD_HOST: - return - - length = len(data) - size = MSG_SIZE[message] - - if size >= 0: - if length != size: - print "Network:: message wrong length! Got %d expected %d: %s" % (MSG_SIZE[message], len(data), MSG_NAME[message]) - return - msg = chr(message) + data - elif size == -1: - if length > 255: - print "Network:: oversized message! Size %d, max size 255: %s" % (length, MSG_NAME[message]) - return - msg = chr(message) + chr(length) + data - else: # size == -2 - self.packer.pack_uint(size) - msg = chr(message) + self.packer.get_buffer() + data - self.packer.reset() - - for sock in self.connection: - if sock == self.socket: - continue - try: - sock.send( msg ) - except socket.error, (value, errmsg): - print "Network:: FAILED to send message (%s) to %s: %s" % (MSG_NAME[message], self.connection[sock].address[0], errmsg) - # TODO something intelligent - - def sendLatencyQuery( self, handler ): - if self.mode != MD_PEER: - return - - self.packer.pack_float(random.random()) - hash = self.packer.get_buffer() - self.packer.reset() - self.latencyQueryHandler[hash] = handler - self.latencyQueryStart[hash] = time.time() - self.send(PR_LATENCY_QUERY,hash) - - #----------------------------------------------------------------------- - # Message Handlers - - def _processSockets( self, inputReady ): - - self._fromListener = True - - if self.mode == MD_HOST: - - for s in inputReady: - if s == self.socket: - # accept new connections - try: - peer, address = self.socket.accept() - self.addPeer( peer, address ) - except socket.error, (value, message): - print "Network:: error accepting connection: " + message - - else: - try: - data = s.recv(MAX_SIZE) - #print "Network:: recv %d bytes: %s" % (len(data), data) - if not len(data): # no data to read, socket must be closed - self.removePeer(s) - else: - self.processStream( s, data ) - except socket.error, (value, message): - print "Network:: error reading data: " + message - - elif self.mode == MD_PEER: - - for s in inputReady: - try: - data = s.recv(MAX_SIZE) - if not len(data): # no data to read, socket must be closed - self.setMode( MD_OFFLINE ) - else: - #print "Network:: recv %d bytes: %s" % (len(data), data) - self.processStream( s, data ) - except socket.error, (value, message): - print "Network:: error reading data: " + message - - else: # MD_WAIT - - for s in inputReady: - try: - peer, address = self.socket.accept() - self.setMode( MD_PEER, (address[0], PORT) ) - except socket.error, (value, message): - print "Network:: error accepting connection: " + message - - self._fromListener = False - - - def processStream( self, sock, newData = "" ): - con = self.connection[sock] - con.recvBuf += newData - - if con.waitingForData == -1: # message size in char - con.waitingForData = ord(con.recvBuf[0]) - con.recvBuf = con.recvBuf[1:] - - elif con.waitingForData == -2: # message size in uint - if len(con.recvBuf) >= 4: - self.unpacker.reset(con.recvBuf[0:4]) - con.waitingForData = self.unpacker.unpack_uint() - con.recvBuf = con.recvBuf[4:] - else: - return # wait for more data - - elif con.waitingForData: - if len(con.recvBuf) >= con.waitingForData: - data = con.recvBuf[0:con.waitingForData] - con.recvBuf = con.recvBuf[con.waitingForData:] - con.waitingForData = 0 - for func in self.processMessage[con.message]: - gobject.idle_add( func, sock, con.message, data ) - else: - return # wait for more data - - else: - con.message = ord(con.recvBuf[0]) - if MSG_SIZE[con.message] == 0: - con.recvBuf = con.recvBuf[1:] - for func in self.processMessage[con.message]: - gobject.idle_add( func, sock, con.message, "" ) - else: - con.waitingForData = MSG_SIZE[con.message] - con.recvBuf = con.recvBuf[1:] - - if len(con.recvBuf): - self.processStream( sock ) - - #-- HOST handlers ------------------------------------------------------ - def processPR_LATENCY_QUERY( self, sock, message, data ): - self.send( HT_LATENCY_REPLY, data, sock ) - - #-- PEER handlers ------------------------------------------------------ - def processHT_LATENCY_REPLY( self, sock, message, data ): - t = time.time() - latency = t - self.latencyQueryStart[data] - #print "got latency reply %d" % (latency*1000) - self.latencyQueryHandler[data]( latency ) - self.latencyQueryHandler.pop(data) - self.latencyQueryStart.pop(data) - - |