# -*- tab-width: 4; indent-tabs-mode: t -*- import socket import threading import traceback import select import time import gobject import SimpleXMLRPCServer import SocketServer __authinfos = {} def _add_authinfo(authinfo): __authinfos[threading.currentThread()] = authinfo def get_authinfo(): return __authinfos.get(threading.currentThread()) def _del_authinfo(): del __authinfos[threading.currentThread()] class GlibTCPServer(SocketServer.TCPServer): """GlibTCPServer Integrate socket accept into glib mainloop. """ allow_reuse_address = True request_queue_size = 20 def __init__(self, server_address, RequestHandlerClass): SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass) self.socket.setblocking(0) # Set nonblocking # Watch the listener socket for data gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept) def _handle_accept(self, source, condition): if not (condition & gobject.IO_IN): return True self.handle_request() return True class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): """ GlibXMLRPCRequestHandler The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass the client's address and/or SSL certificate into the function that actually _processes_ the request. So we have to store it in a thread-indexed dict. """ def do_POST(self): _add_authinfo(self.client_address) try: SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self) except socket.timeout: pass except socket.error, e: print "Error (%s): socket error - '%s'" % (self.client_address, e) _del_authinfo() class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher): """GlibXMLRPCServer Use nonblocking sockets and handle the accept via glib rather than blocking on accept(). """ def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1): self.logRequests = logRequests SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) GlibTCPServer.__init__(self, addr, requestHandler) class GroupChatController(object): _MAX_MSG_SIZE = 500 def __init__(self, address, port, data_cb): self._address = address self._port = port self._data_cb = data_cb self._setup_sender() self._setup_listener() def _setup_sender(self): self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Make the socket multicast-aware, and set TTL. self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit def _setup_listener(self): # Listener socket self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Set some options to make it multicast-friendly self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except: pass self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20) self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1) def start(self): # Set some more multicast options self._listen_sock.bind(('', self._port)) self._listen_sock.settimeout(2) intf = socket.gethostbyname(socket.gethostname()) self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0')) self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0')) # Watch the listener socket for data gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data) def _handle_incoming_data(self, source, condition): if not (condition & gobject.IO_IN): return True msg = {} msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE) if self._data_cb: self._data_cb(msg) return True def send_msg(self, data): self._send_sock.sendto(data, (self._address, self._port))