Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/network.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar/p2p/network.py')
-rw-r--r--sugar/p2p/network.py574
1 files changed, 287 insertions, 287 deletions
diff --git a/sugar/p2p/network.py b/sugar/p2p/network.py
index 6718669..e5b4e4b 100644
--- a/sugar/p2p/network.py
+++ b/sugar/p2p/network.py
@@ -35,347 +35,347 @@ RESULT_SUCCESS = 1
__authinfos = {}
def _add_authinfo(authinfo):
- __authinfos[threading.currentThread()] = authinfo
+ __authinfos[threading.currentThread()] = authinfo
def get_authinfo():
- return __authinfos.get(threading.currentThread())
+ return __authinfos.get(threading.currentThread())
def _del_authinfo():
- del __authinfos[threading.currentThread()]
+ del __authinfos[threading.currentThread()]
class GlibTCPServer(SocketServer.TCPServer):
- """GlibTCPServer
+ """GlibTCPServer
- Integrate socket accept into glib mainloop.
- """
+ Integrate socket accept into glib mainloop.
+ """
- allow_reuse_address = True
- request_queue_size = 20
+ allow_reuse_address = True
+ request_queue_size = 20
- def __init__(self, server_address, RequestHandlerClass):
- SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
- self.socket.setblocking(0) # Set nonblocking
+ def __init__(self, server_address, RequestHandlerClass):
+ SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
+ self.socket.setblocking(0) # Set nonblocking
- # Watch the listener socket for data
- gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
+ # Watch the listener socket for data
+ gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
- def _handle_accept(self, source, condition):
- """Process incoming data on the server's socket by doing an accept()
- via handle_request()."""
- if not (condition & gobject.IO_IN):
- return True
- self.handle_request()
- return True
+ def _handle_accept(self, source, condition):
+ """Process incoming data on the server's socket by doing an accept()
+ via handle_request()."""
+ if not (condition & gobject.IO_IN):
+ return True
+ self.handle_request()
+ return True
class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
- """ GlibXMLRPCRequestHandler
-
- The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
- the client's address and/or SSL certificate into the function that actually
- _processes_ the request. So we have to store it in a thread-indexed dict.
- """
-
- def do_POST(self):
- _add_authinfo(self.client_address)
- try:
- SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
- except socket.timeout:
- pass
- except socket.error, e:
- print "Error (%s): socket error - '%s'" % (self.client_address, e)
- except:
- print "Error while processing POST:"
- traceback.print_exc()
- _del_authinfo()
+ """ GlibXMLRPCRequestHandler
+
+ The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
+ the client's address and/or SSL certificate into the function that actually
+ _processes_ the request. So we have to store it in a thread-indexed dict.
+ """
+
+ def do_POST(self):
+ _add_authinfo(self.client_address)
+ try:
+ SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
+ except socket.timeout:
+ pass
+ except socket.error, e:
+ print "Error (%s): socket error - '%s'" % (self.client_address, e)
+ except:
+ print "Error while processing POST:"
+ traceback.print_exc()
+ _del_authinfo()
class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
- """GlibXMLRPCServer
-
- Use nonblocking sockets and handle the accept via glib rather than
- blocking on accept().
- """
-
- def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=0):
- self.logRequests = logRequests
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
- GlibTCPServer.__init__(self, addr, requestHandler)
-
- def _marshaled_dispatch(self, data, dispatch_method = None):
- """Dispatches an XML-RPC method from marshalled (XML) data.
-
- XML-RPC methods are dispatched from the marshalled (XML) data
- using the _dispatch method and the result is returned as
- marshalled data. For backwards compatibility, a dispatch
- function can be provided as an argument (see comment in
- SimpleXMLRPCRequestHandler.do_POST) but overriding the
- existing method through subclassing is the prefered means
- of changing method dispatch behavior.
- """
-
- params, method = xmlrpclib.loads(data)
-
- # generate response
- try:
- if dispatch_method is not None:
- response = dispatch_method(method, params)
- else:
- response = self._dispatch(method, params)
- # wrap response in a singleton tuple
- response = (response,)
- response = xmlrpclib.dumps(response, methodresponse=1)
- except xmlrpclib.Fault, fault:
- response = xmlrpclib.dumps(fault)
- except:
- print "Exception while processing request:"
- traceback.print_exc()
-
- # report exception back to server
- response = xmlrpclib.dumps(
- xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
- )
-
- return response
+ """GlibXMLRPCServer
+
+ Use nonblocking sockets and handle the accept via glib rather than
+ blocking on accept().
+ """
+
+ def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=0):
+ self.logRequests = logRequests
+ SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
+ GlibTCPServer.__init__(self, addr, requestHandler)
+
+ def _marshaled_dispatch(self, data, dispatch_method = None):
+ """Dispatches an XML-RPC method from marshalled (XML) data.
+
+ XML-RPC methods are dispatched from the marshalled (XML) data
+ using the _dispatch method and the result is returned as
+ marshalled data. For backwards compatibility, a dispatch
+ function can be provided as an argument (see comment in
+ SimpleXMLRPCRequestHandler.do_POST) but overriding the
+ existing method through subclassing is the prefered means
+ of changing method dispatch behavior.
+ """
+
+ params, method = xmlrpclib.loads(data)
+
+ # generate response
+ try:
+ if dispatch_method is not None:
+ response = dispatch_method(method, params)
+ else:
+ response = self._dispatch(method, params)
+ # wrap response in a singleton tuple
+ response = (response,)
+ response = xmlrpclib.dumps(response, methodresponse=1)
+ except xmlrpclib.Fault, fault:
+ response = xmlrpclib.dumps(fault)
+ except:
+ print "Exception while processing request:"
+ traceback.print_exc()
+
+ # report exception back to server
+ response = xmlrpclib.dumps(
+ xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
+ )
+
+ return response
class GlibHTTP(httplib.HTTP):
- """Subclass HTTP so we can return it's connection class' socket."""
- def connect(self, host=None, port=None):
- httplib.HTTP.connect(self, host, port)
- self._conn.sock.setblocking(0)
- def get_sock(self):
- return self._conn.sock
+ """Subclass HTTP so we can return it's connection class' socket."""
+ def connect(self, host=None, port=None):
+ httplib.HTTP.connect(self, host, port)
+ self._conn.sock.setblocking(0)
+ def get_sock(self):
+ return self._conn.sock
class GlibXMLRPCTransport(xmlrpclib.Transport):
- """Integrate the request with the glib mainloop rather than blocking."""
- ##
- # Connect to server.
- #
- # @param host Target host.
- # @return A connection handle.
-
- def __init__(self):
- pass
-
- def make_connection(self, host):
- """Use our own connection object so we can get its socket."""
- # create a HTTP connection object from a host descriptor
- host, extra_headers, x509 = self.get_host_info(host)
- return GlibHTTP(host)
-
- ##
- # Send a complete request, and parse the response.
- #
- # @param host Target host.
- # @param handler Target PRC handler.
- # @param request_body XML-RPC request body.
- # @param verbose Debugging flag.
- # @return Parsed response.
-
- def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None):
- """Do the first half of the request by sending data to the remote
- server. The bottom half bits get run when the remote server's response
- actually comes back."""
- # issue XML-RPC request
-
- h = self.make_connection(host)
- if verbose:
- h.set_debuglevel(1)
-
- self.send_request(h, handler, request_body)
- self.send_host(h, host)
- self.send_user_agent(h)
- self.send_content(h, request_body)
-
- # Schedule a GIOWatch so we don't block waiting for the response
- gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request,
- h, host, handler, verbose, request_cb, user_data)
-
- def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data):
- """Parse and return response when the remote server actually returns it."""
- if not (condition & gobject.IO_IN):
- return True
-
- try:
- errcode, errmsg, headers = h.getreply()
- except socket.error, err:
- if err[0] != 104:
- raise socket.error(err)
- else:
- gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
- return False
-
- if errcode != 200:
- raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)
- self.verbose = verbose
- response = self._parse_response(h.getfile(), h.get_sock())
- if request_cb:
- if len(response) == 1:
- response = response[0]
- gobject.idle_add(request_cb, RESULT_SUCCESS, response, user_data)
- return False
+ """Integrate the request with the glib mainloop rather than blocking."""
+ ##
+ # Connect to server.
+ #
+ # @param host Target host.
+ # @return A connection handle.
+
+ def __init__(self):
+ pass
+
+ def make_connection(self, host):
+ """Use our own connection object so we can get its socket."""
+ # create a HTTP connection object from a host descriptor
+ host, extra_headers, x509 = self.get_host_info(host)
+ return GlibHTTP(host)
+
+ ##
+ # Send a complete request, and parse the response.
+ #
+ # @param host Target host.
+ # @param handler Target PRC handler.
+ # @param request_body XML-RPC request body.
+ # @param verbose Debugging flag.
+ # @return Parsed response.
+
+ def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None):
+ """Do the first half of the request by sending data to the remote
+ server. The bottom half bits get run when the remote server's response
+ actually comes back."""
+ # issue XML-RPC request
+
+ h = self.make_connection(host)
+ if verbose:
+ h.set_debuglevel(1)
+
+ self.send_request(h, handler, request_body)
+ self.send_host(h, host)
+ self.send_user_agent(h)
+ self.send_content(h, request_body)
+
+ # Schedule a GIOWatch so we don't block waiting for the response
+ gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request,
+ h, host, handler, verbose, request_cb, user_data)
+
+ def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data):
+ """Parse and return response when the remote server actually returns it."""
+ if not (condition & gobject.IO_IN):
+ return True
+
+ try:
+ errcode, errmsg, headers = h.getreply()
+ except socket.error, err:
+ if err[0] != 104:
+ raise socket.error(err)
+ else:
+ gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
+ return False
+
+ if errcode != 200:
+ raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)
+ self.verbose = verbose
+ response = self._parse_response(h.getfile(), h.get_sock())
+ if request_cb:
+ if len(response) == 1:
+ response = response[0]
+ gobject.idle_add(request_cb, RESULT_SUCCESS, response, user_data)
+ return False
class _Method:
- """Right, so python people thought it would be funny to make this
- class private to xmlrpclib.py..."""
- # some magic to bind an XML-RPC method to an RPC server.
- # supports "nested" methods (e.g. examples.getStateName)
- def __init__(self, send, name):
- self.__send = send
- self.__name = name
- def __getattr__(self, name):
- return _Method(self.__send, "%s.%s" % (self.__name, name))
- def __call__(self, request_cb, user_data, *args):
- return self.__send(self.__name, request_cb, user_data, args)
+ """Right, so python people thought it would be funny to make this
+ class private to xmlrpclib.py..."""
+ # some magic to bind an XML-RPC method to an RPC server.
+ # supports "nested" methods (e.g. examples.getStateName)
+ def __init__(self, send, name):
+ self.__send = send
+ self.__name = name
+ def __getattr__(self, name):
+ return _Method(self.__send, "%s.%s" % (self.__name, name))
+ def __call__(self, request_cb, user_data, *args):
+ return self.__send(self.__name, request_cb, user_data, args)
class GlibServerProxy(xmlrpclib.ServerProxy):
- """Subclass xmlrpclib.ServerProxy so we can run the XML-RPC request
- in two parts, integrated with the glib mainloop, such that we don't
- block anywhere.
-
- Using this object is somewhat special; it requires more arguments to each
- XML-RPC request call than the normal xmlrpclib.ServerProxy object:
-
- client = GlibServerProxy("http://127.0.0.1:8888")
- user_data = "bar"
- xmlrpc_arg1 = "test"
- xmlrpc_arg2 = "foo"
- client.test(xmlrpc_test_cb, user_data, xmlrpc_arg1, xmlrpc_arg2)
-
- Here, 'xmlrpc_test_cb' is the callback function, which has the following
- signature:
-
- def xmlrpc_test_cb(result_status, response, user_data=None):
- ...
- """
- def __init__(self, uri, encoding=None, verbose=0, allow_none=0):
- self._transport = GlibXMLRPCTransport()
- self._encoding = encoding
- self._verbose = verbose
- self._allow_none = allow_none
- xmlrpclib.ServerProxy.__init__(self, uri, self._transport, encoding, verbose, allow_none)
-
- # get the url
- import urllib
- urltype, uri = urllib.splittype(uri)
- if urltype not in ("http", "https"):
- raise IOError, "unsupported XML-RPC protocol"
- self._host, self._handler = urllib.splithost(uri)
- if not self._handler:
- self._handler = "/RPC2"
-
- def __request(self, methodname, request_cb, user_data, params):
- """Call the method on the remote server. We just start the request here
- and the transport itself takes care of scheduling the response callback
- when the remote server returns the response. We don't want to block anywhere."""
-
- request = xmlrpclib.dumps(params, methodname, encoding=self._encoding,
- allow_none=self._allow_none)
-
- try:
- response = self._transport.start_request(
- self._host,
- self._handler,
- request,
- verbose=self._verbose,
- request_cb=request_cb,
- user_data=user_data
- )
- except socket.error, exc:
- gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
-
- def __getattr__(self, name):
- # magic method dispatcher
- return _Method(self.__request, name)
+ """Subclass xmlrpclib.ServerProxy so we can run the XML-RPC request
+ in two parts, integrated with the glib mainloop, such that we don't
+ block anywhere.
+
+ Using this object is somewhat special; it requires more arguments to each
+ XML-RPC request call than the normal xmlrpclib.ServerProxy object:
+
+ client = GlibServerProxy("http://127.0.0.1:8888")
+ user_data = "bar"
+ xmlrpc_arg1 = "test"
+ xmlrpc_arg2 = "foo"
+ client.test(xmlrpc_test_cb, user_data, xmlrpc_arg1, xmlrpc_arg2)
+
+ Here, 'xmlrpc_test_cb' is the callback function, which has the following
+ signature:
+
+ def xmlrpc_test_cb(result_status, response, user_data=None):
+ ...
+ """
+ def __init__(self, uri, encoding=None, verbose=0, allow_none=0):
+ self._transport = GlibXMLRPCTransport()
+ self._encoding = encoding
+ self._verbose = verbose
+ self._allow_none = allow_none
+ xmlrpclib.ServerProxy.__init__(self, uri, self._transport, encoding, verbose, allow_none)
+
+ # get the url
+ import urllib
+ urltype, uri = urllib.splittype(uri)
+ if urltype not in ("http", "https"):
+ raise IOError, "unsupported XML-RPC protocol"
+ self._host, self._handler = urllib.splithost(uri)
+ if not self._handler:
+ self._handler = "/RPC2"
+
+ def __request(self, methodname, request_cb, user_data, params):
+ """Call the method on the remote server. We just start the request here
+ and the transport itself takes care of scheduling the response callback
+ when the remote server returns the response. We don't want to block anywhere."""
+
+ request = xmlrpclib.dumps(params, methodname, encoding=self._encoding,
+ allow_none=self._allow_none)
+
+ try:
+ response = self._transport.start_request(
+ self._host,
+ self._handler,
+ request,
+ verbose=self._verbose,
+ request_cb=request_cb,
+ user_data=user_data
+ )
+ except socket.error, exc:
+ gobject.idle_add(request_cb, RESULT_FAILED, None, user_data)
+
+ def __getattr__(self, name):
+ # magic method dispatcher
+ return _Method(self.__request, name)
class GroupServer(object):
- _MAX_MSG_SIZE = 500
+ _MAX_MSG_SIZE = 500
- def __init__(self, address, port, data_cb):
- self._address = address
- self._port = port
- self._data_cb = data_cb
+ def __init__(self, address, port, data_cb):
+ self._address = address
+ self._port = port
+ self._data_cb = data_cb
- self._setup_listener()
+ self._setup_listener()
- def _setup_listener(self):
- # Listener socket
- self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ def _setup_listener(self):
+ # Listener socket
+ self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # Set some options to make it multicast-friendly
- self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
+ # Set some options to make it multicast-friendly
+ self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
+ self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
- def start(self):
- # Set some more multicast options
- self._listen_sock.bind(('', self._port))
- self._listen_sock.settimeout(2)
- intf = socket.gethostbyname(socket.gethostname())
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
+ def start(self):
+ # Set some more multicast options
+ self._listen_sock.bind(('', self._port))
+ self._listen_sock.settimeout(2)
+ intf = socket.gethostbyname(socket.gethostname())
+ self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
+ self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
- # Watch the listener socket for data
- gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
+ # Watch the listener socket for data
+ gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
- def _handle_incoming_data(self, source, condition):
- if not (condition & gobject.IO_IN):
- return True
- msg = {}
- msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
- if self._data_cb:
- self._data_cb(msg)
- return True
+ def _handle_incoming_data(self, source, condition):
+ if not (condition & gobject.IO_IN):
+ return True
+ msg = {}
+ msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
+ if self._data_cb:
+ self._data_cb(msg)
+ return True
class GroupClient(object):
- _MAX_MSG_SIZE = 500
+ _MAX_MSG_SIZE = 500
- def __init__(self, address, port):
- self._address = address
- self._port = port
+ def __init__(self, address, port):
+ self._address = address
+ self._port = port
- self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # Make the socket multicast-aware, and set TTL.
- self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
+ self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ # Make the socket multicast-aware, and set TTL.
+ self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
- def send_msg(self, data):
- self._send_sock.sendto(data, (self._address, self._port))
+ def send_msg(self, data):
+ self._send_sock.sendto(data, (self._address, self._port))
class Test(object):
- def test(self, arg1):
- print "Request got %s" % arg1
- return "success"
+ def test(self, arg1):
+ print "Request got %s" % arg1
+ return "success"
def xmlrpc_test_cb(response, user_data=None):
- print "Response was %s, user_data was %s" % (response, user_data)
- import gtk
- gtk.main_quit()
+ print "Response was %s, user_data was %s" % (response, user_data)
+ import gtk
+ gtk.main_quit()
def xmlrpc_test():
- client = GlibServerProxy("http://127.0.0.1:8888")
- client.test(xmlrpc_test_cb, "bar", "test data")
+ client = GlibServerProxy("http://127.0.0.1:8888")
+ client.test(xmlrpc_test_cb, "bar", "test data")
def main():
- import gtk
- server = GlibXMLRPCServer(("", 8888))
- inst = Test()
- server.register_instance(inst)
-
- gobject.idle_add(xmlrpc_test)
-
- try:
- gtk.main()
- except KeyboardInterrupt:
- print 'Ctrl+C pressed, exiting...'
- print "Done."
+ import gtk
+ server = GlibXMLRPCServer(("", 8888))
+ inst = Test()
+ server.register_instance(inst)
+
+ gobject.idle_add(xmlrpc_test)
+
+ try:
+ gtk.main()
+ except KeyboardInterrupt:
+ print 'Ctrl+C pressed, exiting...'
+ print "Done."
if __name__ == "__main__":
- main()
+ main()