diff options
author | Marco Pesenti Gritti <mpg@redhat.com> | 2006-05-22 06:11:39 (GMT) |
---|---|---|
committer | Marco Pesenti Gritti <mpg@redhat.com> | 2006-05-22 06:11:39 (GMT) |
commit | f5173c33d07c572644e7769cc5e46d0217b529c1 (patch) | |
tree | 073e189e0c2e655c321e43e628e71087e599d1cc /sugar/p2p/Stream.py | |
parent | d3c5da1a667c381346879929cf5471df85d72d23 (diff) |
Merge
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r-- | sugar/p2p/Stream.py | 75 |
1 files changed, 71 insertions, 4 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py index e6c1a91..2467f5a 100644 --- a/sugar/p2p/Stream.py +++ b/sugar/p2p/Stream.py @@ -1,6 +1,12 @@ import xmlrpclib import socket import traceback +import threading + +import pygtk +pygtk.require('2.0') +import gobject + import network from MostlyReliablePipe import MostlyReliablePipe @@ -32,7 +38,7 @@ class Stream(object): self._callback(self._group.get_buddy(nick_name), data) -class UnicastStreamWriter(object): +class UnicastStreamWriterBase(object): def __init__(self, stream, service, owner_nick_name): # set up the writer if not service: @@ -42,6 +48,10 @@ class UnicastStreamWriter(object): self._address = self._service.get_address() self._port = self._service.get_port() self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + +class UnicastStreamWriter(UnicastStreamWriterBase): + def __init__(self, stream, service, owner_nick_name): + UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) def write(self, data): @@ -63,6 +73,60 @@ class UnicastStreamWriter(object): return None +class ThreadedRequest(threading.Thread): + def __init__(self, controller, addr, method, response_cb, user_data, *args): + threading.Thread.__init__(self) + self._controller = controller + self._method = method + self._args = args + self._response_cb = response_cb + self._user_data = user_data + self._writer = xmlrpclib.ServerProxy(addr) + + def run(self): + response = None + try: + method = getattr(self._writer, self._method) + response = method(*self._args) + except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): + traceback.print_exc() + if self._response_cb: + gobject.idle_add(self._response_cb, response, self._user_data) + self._controller.notify_request_done(self) + +class ThreadedUnicastStreamWriter(UnicastStreamWriterBase): + def __init__(self, stream, service, owner_nick_name): + self._requests_lock = threading.Lock() + self._requests = [] + UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name) + + def _add_request(self, request): + self._requests_lock.acquire() + if not request in self._requests: + self._requests.append(request) + self._requests_lock.release() + + def write(self, response_cb, user_data, data): + """Write some data to the default endpoint of this pipe on the remote server.""" + request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb, + user_data, self._owner_nick_name, data) + self._add_request(request) + request.start() + + def custom_request(self, method_name, response_cb, user_data, *args): + """Call a custom XML-RPC method on the remote server.""" + request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb, + user_data, *args) + self._add_request(request) + request.start() + + def notify_request_done(self, request): + self._requests_lock.acquire() + if request in self._requests: + self._requests.remove(request) + self._requests_lock.release() + + class UnicastStream(Stream): def __init__(self, service, group): Stream.__init__(self, service, group) @@ -94,8 +158,11 @@ class UnicastStream(Stream): raise ValueError("Handler name 'message' is a reserved handler.") self._reader.register_function(handler, name) - def new_writer(self, service): - return UnicastStreamWriter(self, service, self._owner_nick_name) + def new_writer(self, service, threaded=False): + if threaded: + return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name) + else: + return UnicastStreamWriter(self, service, self._owner_nick_name) class MulticastStream(Stream): @@ -115,5 +182,5 @@ class MulticastStream(Stream): [ nick_name, data ] = data.split(" |**| ", 2) self.recv(nick_name, data) - def new_writer(self, service=None): + def new_writer(self, service=None, threaded=False): return self |