Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/p2p/network.py
blob: c88ede6cbbe1b4e013866f0d9978bbddc76724ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# -*- tab-width: 4; indent-tabs-mode: t -*- 

import socket
import threading
import traceback
import select
import time
import xmlrpclib
import sys

import gobject
import SimpleXMLRPCServer
import SocketServer

__authinfos = {}

def _add_authinfo(authinfo):
	__authinfos[threading.currentThread()] = authinfo

def get_authinfo():
	return __authinfos.get(threading.currentThread())

def _del_authinfo():
	del __authinfos[threading.currentThread()]


class GlibTCPServer(SocketServer.TCPServer):
	"""GlibTCPServer

	Integrate socket accept into glib mainloop.
	"""

	allow_reuse_address = True
	request_queue_size = 20

	def __init__(self, server_address, RequestHandlerClass):
		SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
		self.socket.setblocking(0)  # Set nonblocking

		# Watch the listener socket for data
		gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)

	def _handle_accept(self, source, condition):
		if not (condition & gobject.IO_IN):
			return True
		self.handle_request()
		return True

class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
	""" GlibXMLRPCRequestHandler
	
	The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
	the client's address and/or SSL certificate into the function that actually
	_processes_ the request.  So we have to store it in a thread-indexed dict.
	"""

	def do_POST(self):
		_add_authinfo(self.client_address)
		try:
			SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
		except socket.timeout:
			pass
		except socket.error, e:
			print "Error (%s): socket error - '%s'" % (self.client_address, e)
		except:
			print "Error while processing POST:"
			traceback.print_exc()
		_del_authinfo()

class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
	"""GlibXMLRPCServer
	
	Use nonblocking sockets and handle the accept via glib rather than
	blocking on accept().
	"""

	def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
		self.logRequests = logRequests

		SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
		GlibTCPServer.__init__(self, addr, requestHandler)

	def _marshaled_dispatch(self, data, dispatch_method = None):
		"""Dispatches an XML-RPC method from marshalled (XML) data.

		XML-RPC methods are dispatched from the marshalled (XML) data
		using the _dispatch method and the result is returned as
		marshalled data. For backwards compatibility, a dispatch
		function can be provided as an argument (see comment in
		SimpleXMLRPCRequestHandler.do_POST) but overriding the
		existing method through subclassing is the prefered means
		of changing method dispatch behavior.
		"""

		params, method = xmlrpclib.loads(data)

		# generate response
		try:
			if dispatch_method is not None:
				response = dispatch_method(method, params)
			else:
				response = self._dispatch(method, params)
			# wrap response in a singleton tuple
			response = (response,)
			response = xmlrpclib.dumps(response, methodresponse=1)
		except xmlrpclib.Fault, fault:
			response = xmlrpclib.dumps(fault)
		except:
			print "Exception while processing request:"
			traceback.print_exc()

			# report exception back to server
			response = xmlrpclib.dumps(
				xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
				)

		return response

class GroupServer(object):

	_MAX_MSG_SIZE = 500

	def __init__(self, address, port, data_cb):
		self._address = address
		self._port = port
		self._data_cb = data_cb

		self._setup_listener()

	def _setup_listener(self):
		# Listener socket
		self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

		# Set some options to make it multicast-friendly
		self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
		try:
			self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
		except:
			pass
		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)

	def start(self):
		# Set some more multicast options
		self._listen_sock.bind(('', self._port))
		self._listen_sock.settimeout(2)
		intf = socket.gethostbyname(socket.gethostname())
		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
		self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))

		# Watch the listener socket for data
		gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)

	def _handle_incoming_data(self, source, condition):
		if not (condition & gobject.IO_IN):
			return True
		msg = {}
		msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
		if self._data_cb:
			self._data_cb(msg)
		return True

class GroupClient(object):

	_MAX_MSG_SIZE = 500

	def __init__(self, address, port):
		self._address = address
		self._port = port

		self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
		# Make the socket multicast-aware, and set TTL.
		self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit

	def send_msg(self, data):
		self._send_sock.sendto(data, (self._address, self._port))