Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2006-05-23 04:15:14 (GMT)
committer Dan Williams <dcbw@redhat.com>2006-05-23 04:15:14 (GMT)
commit6cc5d749f61f8e68de946678a9c5981700a3af49 (patch)
treeda964b3b8604ea01d2a0bb7909bab948ecf4df7f /sugar/p2p
parent8e7a72c9da2d3802e9d4ac1f2c6b6af28f21c4a7 (diff)
- Implement async XML-RPC client
Diffstat (limited to 'sugar/p2p')
-rw-r--r--sugar/p2p/Stream.py84
-rw-r--r--sugar/p2p/network.py175
2 files changed, 185 insertions, 74 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py
index ef95d60..7f00c57 100644
--- a/sugar/p2p/Stream.py
+++ b/sugar/p2p/Stream.py
@@ -1,7 +1,6 @@
import xmlrpclib
import socket
import traceback
-import threading
import pygtk
pygtk.require('2.0')
@@ -38,7 +37,7 @@ class Stream(object):
self._callback(self._group.get_buddy(nick_name), data)
-class UnicastStreamWriterBase(object):
+class UnicastStreamWriter(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
if not service:
@@ -48,84 +47,26 @@ class UnicastStreamWriterBase(object):
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
+ self._writer = network.GlibServerProxy(self._xmlrpc_addr)
-class UnicastStreamWriter(UnicastStreamWriterBase):
- def __init__(self, stream, service, owner_nick_name):
- UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
- self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
-
- def write(self, data):
+ def write(self, xmlrpc_data):
"""Write some data to the default endpoint of this pipe on the remote server."""
try:
- self._writer.message(self._owner_nick_name, data)
+ self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
- def custom_request(self, method_name, *args):
+ def custom_request(self, method_name, request_cb, user_data, *args):
"""Call a custom XML-RPC method on the remote server."""
try:
method = getattr(self._writer, method_name)
- return method(*args)
- except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
- pass
- #traceback.print_exc()
- return None
-
-
-class ThreadedRequest(threading.Thread):
- def __init__(self, controller, addr, method, response_cb, user_data, *args):
- threading.Thread.__init__(self)
- self._controller = controller
- self._method = method
- self._args = args
- self._response_cb = response_cb
- self._user_data = user_data
- self._writer = xmlrpclib.ServerProxy(addr)
-
- def run(self):
- response = None
- try:
- method = getattr(self._writer, self._method)
- response = method(*self._args)
+ method(request_cb, user_data, *args)
+ return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
- if self._response_cb:
- gobject.idle_add(self._response_cb, response, self._user_data)
- self._controller.notify_request_done(self)
-
-class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
- def __init__(self, stream, service, owner_nick_name):
- self._requests_lock = threading.Lock()
- self._requests = []
- UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
-
- def _add_request(self, request):
- self._requests_lock.acquire()
- if not request in self._requests:
- self._requests.append(request)
- self._requests_lock.release()
-
- def write(self, response_cb, user_data, data):
- """Write some data to the default endpoint of this pipe on the remote server."""
- request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
- user_data, self._owner_nick_name, data)
- self._add_request(request)
- request.start()
-
- def custom_request(self, method_name, response_cb, user_data, *args):
- """Call a custom XML-RPC method on the remote server."""
- request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
- user_data, *args)
- self._add_request(request)
- request.start()
-
- def notify_request_done(self, request):
- self._requests_lock.acquire()
- if request in self._requests:
- self._requests.remove(request)
- self._requests_lock.release()
+ return False
class UnicastStream(Stream):
@@ -159,11 +100,8 @@ class UnicastStream(Stream):
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
- def new_writer(self, service, threaded=False):
- if threaded:
- return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
- else:
- return UnicastStreamWriter(self, service, self._owner_nick_name)
+ def new_writer(self, service):
+ return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
@@ -183,5 +121,5 @@ class MulticastStream(Stream):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
- def new_writer(self, service=None, threaded=False):
+ def new_writer(self, service=None):
return self
diff --git a/sugar/p2p/network.py b/sugar/p2p/network.py
index 4c054fe..684d114 100644
--- a/sugar/p2p/network.py
+++ b/sugar/p2p/network.py
@@ -3,11 +3,13 @@ import threading
import traceback
import xmlrpclib
import sys
+import httplib
import gobject
import SimpleXMLRPCServer
import SocketServer
+
__authinfos = {}
def _add_authinfo(authinfo):
@@ -37,6 +39,8 @@ class GlibTCPServer(SocketServer.TCPServer):
gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
def _handle_accept(self, source, condition):
+ """Process incoming data on the server's socket by doing an accept()
+ via handle_request()."""
if not (condition & gobject.IO_IN):
return True
self.handle_request()
@@ -72,7 +76,6 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher)
def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
self.logRequests = logRequests
-
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
GlibTCPServer.__init__(self, addr, requestHandler)
@@ -112,6 +115,141 @@ class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher)
return response
+
+class GlibHTTP(httplib.HTTP):
+ """Subclass HTTP so we can return it's connection class' socket."""
+ def get_sock(self):
+ return self._conn.sock
+
+class GlibXMLRPCTransport(xmlrpclib.Transport):
+ """Integrate the request with the glib mainloop rather than blocking."""
+ ##
+ # Connect to server.
+ #
+ # @param host Target host.
+ # @return A connection handle.
+
+ def make_connection(self, host):
+ """Use our own connection object so we can get its socket."""
+ # create a HTTP connection object from a host descriptor
+ host, extra_headers, x509 = self.get_host_info(host)
+ return GlibHTTP(host)
+
+ ##
+ # Send a complete request, and parse the response.
+ #
+ # @param host Target host.
+ # @param handler Target PRC handler.
+ # @param request_body XML-RPC request body.
+ # @param verbose Debugging flag.
+ # @return Parsed response.
+
+ def start_request(self, host, handler, request_body, verbose=0, request_cb=None, user_data=None):
+ """Do the first half of the request by sending data to the remote
+ server. The bottom half bits get run when the remote server's response
+ actually comes back."""
+ # issue XML-RPC request
+
+ h = self.make_connection(host)
+ if verbose:
+ h.set_debuglevel(1)
+
+ self.send_request(h, handler, request_body)
+ self.send_host(h, host)
+ self.send_user_agent(h)
+ self.send_content(h, request_body)
+
+ # Schedule a GIOWatch so we don't block waiting for the response
+ gobject.io_add_watch(h.get_sock(), gobject.IO_IN, self._finish_request,
+ h, host, handler, verbose, request_cb, user_data)
+
+ def _finish_request(self, source, condition, h, host, handler, verbose, request_cb, user_data):
+ """Parse and return response when the remote server actually returns it."""
+ if not (condition & gobject.IO_IN):
+ return True
+
+ errcode, errmsg, headers = h.getreply()
+ if errcode != 200:
+ raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)
+ self.verbose = verbose
+ response = self._parse_response(h.getfile(), h.get_sock())
+ if request_cb:
+ if len(response) == 1:
+ response = response[0]
+ gobject.idle_add(request_cb, response, user_data)
+
+class _Method:
+ """Right, so python people thought it would be funny to make this
+ class private to xmlrpclib.py..."""
+ # some magic to bind an XML-RPC method to an RPC server.
+ # supports "nested" methods (e.g. examples.getStateName)
+ def __init__(self, send, name):
+ self.__send = send
+ self.__name = name
+ def __getattr__(self, name):
+ return _Method(self.__send, "%s.%s" % (self.__name, name))
+ def __call__(self, request_cb, user_data, *args):
+ return self.__send(self.__name, request_cb, user_data, args)
+
+
+class GlibServerProxy(xmlrpclib.ServerProxy):
+ """Subclass xmlrpclib.ServerProxy so we can run the XML-RPC request
+ in two parts, integrated with the glib mainloop, such that we don't
+ block anywhere.
+
+ Using this object is somewhat special; it requires more arguments to each
+ XML-RPC request call than the normal xmlrpclib.ServerProxy object:
+
+ client = GlibServerProxy("http://127.0.0.1:8888")
+ user_data = "bar"
+ xmlrpc_arg1 = "test"
+ xmlrpc_arg2 = "foo"
+ client.test(xmlrpc_test_cb, user_data, xmlrpc_arg1, xmlrpc_arg2)
+
+ Here, 'xmlrpc_test_cb' is the callback function, which has the following
+ signature:
+
+ def xmlrpc_test_cb(response, user_data=None):
+ ...
+ """
+ def __init__(self, uri, encoding=None, verbose=0, allow_none=0):
+ self._transport = GlibXMLRPCTransport()
+ self._encoding = encoding
+ self._verbose = verbose
+ self._allow_none = allow_none
+ xmlrpclib.ServerProxy.__init__(self, uri, self._transport, encoding, verbose, allow_none)
+
+ # get the url
+ import urllib
+ type, uri = urllib.splittype(uri)
+ if type not in ("http", "https"):
+ raise IOError, "unsupported XML-RPC protocol"
+ self._host, self._handler = urllib.splithost(uri)
+ if not self._handler:
+ self._handler = "/RPC2"
+
+ def __request(self, methodname, request_cb, user_data, params):
+ """Call the method on the remote server. We just start the request here
+ and the transport itself takes care of scheduling the response callback
+ when the remote server returns the response. We don't want to block anywhere."""
+
+ request = xmlrpclib.dumps(params, methodname, encoding=self._encoding,
+ allow_none=self._allow_none)
+
+ response = self._transport.start_request(
+ self._host,
+ self._handler,
+ request,
+ verbose=self._verbose,
+ request_cb=request_cb,
+ user_data=user_data
+ )
+
+ def __getattr__(self, name):
+ # magic method dispatcher
+ return _Method(self.__request, name)
+
+
class GroupServer(object):
_MAX_MSG_SIZE = 500
@@ -166,3 +304,38 @@ class GroupClient(object):
def send_msg(self, data):
self._send_sock.sendto(data, (self._address, self._port))
+
+
+
+class Test(object):
+ def test(self, arg1):
+ print "Request got %s" % arg1
+ return "success"
+
+def xmlrpc_test_cb(response, user_data=None):
+ print "Response was %s, user_data was %s" % (response, user_data)
+ import gtk
+ gtk.main_quit()
+
+
+def xmlrpc_test():
+ client = GlibServerProxy("http://127.0.0.1:8888")
+ client.test(xmlrpc_test_cb, "bar", "test data")
+
+
+def main():
+ import gtk, gobject
+ server = GlibXMLRPCServer(("", 8888))
+ inst = Test()
+ server.register_instance(inst)
+
+ gobject.idle_add(xmlrpc_test)
+
+ try:
+ gtk.main()
+ except KeyboardInterrupt:
+ pass
+ print "Done."
+
+if __name__ == "__main__":
+ main()