1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# -*- tab-width: 4; indent-tabs-mode: t -*-
import socket
import threading
import traceback
import select
import time
import gobject
import SimpleXMLRPCServer
import SocketServer
__authinfos = {}
def _add_authinfo(authinfo):
__authinfos[threading.currentThread()] = authinfo
def get_authinfo():
return __authinfos.get(threading.currentThread())
def _del_authinfo():
del __authinfos[threading.currentThread()]
class GlibTCPServer(SocketServer.TCPServer):
"""GlibTCPServer
Integrate socket accept into glib mainloop.
"""
allow_reuse_address = True
request_queue_size = 20
def __init__(self, server_address, RequestHandlerClass):
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
self.socket.setblocking(0) # Set nonblocking
# Watch the listener socket for data
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
def _handle_accept(self, source, condition):
if not (condition & gobject.IO_IN):
return True
self.handle_request()
return True
class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
""" GlibXMLRPCRequestHandler
The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
the client's address and/or SSL certificate into the function that actually
_processes_ the request. So we have to store it in a thread-indexed dict.
"""
def do_POST(self):
_add_authinfo(self.client_address)
try:
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
except socket.timeout:
pass
except socket.error, e:
print "Error (%s): socket error - '%s'" % (self.client_address, e)
_del_authinfo()
class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
"""GlibXMLRPCServer
Use nonblocking sockets and handle the accept via glib rather than
blocking on accept().
"""
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
self.logRequests = logRequests
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
GlibTCPServer.__init__(self, addr, requestHandler)
class GroupChatController(object):
_MAX_MSG_SIZE = 500
def __init__(self, address, port, data_cb):
self._address = address
self._port = port
self._data_cb = data_cb
self._setup_sender()
self._setup_listener()
def _setup_sender(self):
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make the socket multicast-aware, and set TTL.
self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
def _setup_listener(self):
# Listener socket
self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set some options to make it multicast-friendly
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
def start(self):
# Set some more multicast options
self._listen_sock.bind(('', self._port))
self._listen_sock.settimeout(2)
intf = socket.gethostbyname(socket.gethostname())
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
# Watch the listener socket for data
gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
def _handle_incoming_data(self, source, condition):
if not (condition & gobject.IO_IN):
return True
msg = {}
msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
if self._data_cb:
self._data_cb(msg)
return True
def send_msg(self, data):
self._send_sock.sendto(data, (self._address, self._port))
|