diff options
author | Dan Williams <dcbw@redhat.com> | 2006-05-23 04:15:14 (GMT) |
---|---|---|
committer | Dan Williams <dcbw@redhat.com> | 2006-05-23 04:15:14 (GMT) |
commit | 6cc5d749f61f8e68de946678a9c5981700a3af49 (patch) | |
tree | da964b3b8604ea01d2a0bb7909bab948ecf4df7f /sugar/p2p/Stream.py | |
parent | 8e7a72c9da2d3802e9d4ac1f2c6b6af28f21c4a7 (diff) |
- Implement async XML-RPC client
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r-- | sugar/p2p/Stream.py | 84 |
1 files changed, 11 insertions, 73 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py index ef95d60..7f00c57 100644 --- a/sugar/p2p/Stream.py +++ b/sugar/p2p/Stream.py @@ -1,7 +1,6 @@ import xmlrpclib import socket import traceback -import threading import pygtk pygtk.require('2.0') @@ -38,7 +37,7 @@ class Stream(object): self._callback(self._group.get_buddy(nick_name), data) -class UnicastStreamWriterBase(object): +class UnicastStreamWriter(object): def __init__(self, stream, service, owner_nick_name): # set up the writer if not service: @@ -48,84 +47,26 @@ class UnicastStreamWriterBase(object): self._address = self._service.get_address() self._port = self._service.get_port() self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + self._writer = network.GlibServerProxy(self._xmlrpc_addr) -class UnicastStreamWriter(UnicastStreamWriterBase): - def __init__(self, stream, service, owner_nick_name): - UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) - self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) - - def write(self, data): + def write(self, xmlrpc_data): """Write some data to the default endpoint of this pipe on the remote server.""" try: - self._writer.message(self._owner_nick_name, data) + self._writer.message(None, None, self._owner_nick_name, xmlrpc_data) return True except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): traceback.print_exc() return False - def custom_request(self, method_name, *args): + def custom_request(self, method_name, request_cb, user_data, *args): """Call a custom XML-RPC method on the remote server.""" try: method = getattr(self._writer, method_name) - return method(*args) - except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): - pass - #traceback.print_exc() - return None - - -class ThreadedRequest(threading.Thread): - def __init__(self, controller, addr, method, response_cb, user_data, *args): - threading.Thread.__init__(self) - self._controller = controller - self._method = method - self._args = args - self._response_cb = response_cb - self._user_data = user_data - self._writer = xmlrpclib.ServerProxy(addr) - - def run(self): - response = None - try: - method = getattr(self._writer, self._method) - response = method(*self._args) + method(request_cb, user_data, *args) + return True except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): traceback.print_exc() - if self._response_cb: - gobject.idle_add(self._response_cb, response, self._user_data) - self._controller.notify_request_done(self) - -class ThreadedUnicastStreamWriter(UnicastStreamWriterBase): - def __init__(self, stream, service, owner_nick_name): - self._requests_lock = threading.Lock() - self._requests = [] - UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) - - def _add_request(self, request): - self._requests_lock.acquire() - if not request in self._requests: - self._requests.append(request) - self._requests_lock.release() - - def write(self, response_cb, user_data, data): - """Write some data to the default endpoint of this pipe on the remote server.""" - request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb, - user_data, self._owner_nick_name, data) - self._add_request(request) - request.start() - - def custom_request(self, method_name, response_cb, user_data, *args): - """Call a custom XML-RPC method on the remote server.""" - request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb, - user_data, *args) - self._add_request(request) - request.start() - - def notify_request_done(self, request): - self._requests_lock.acquire() - if request in self._requests: - self._requests.remove(request) - self._requests_lock.release() + return False class UnicastStream(Stream): @@ -159,11 +100,8 @@ class UnicastStream(Stream): raise ValueError("Handler name 'message' is a reserved handler.") self._reader.register_function(handler, name) - def new_writer(self, service, threaded=False): - if threaded: - return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name) - else: - return UnicastStreamWriter(self, service, self._owner_nick_name) + def new_writer(self, service): + return UnicastStreamWriter(self, service, self._owner_nick_name) class MulticastStream(Stream): @@ -183,5 +121,5 @@ class MulticastStream(Stream): [ nick_name, data ] = data.split(" |**| ", 2) self.recv(nick_name, data) - def new_writer(self, service=None, threaded=False): + def new_writer(self, service=None): return self |