Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/Stream.py
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2006-05-23 04:15:14 (GMT)
committer Dan Williams <dcbw@redhat.com>2006-05-23 04:15:14 (GMT)
commit6cc5d749f61f8e68de946678a9c5981700a3af49 (patch)
treeda964b3b8604ea01d2a0bb7909bab948ecf4df7f /sugar/p2p/Stream.py
parent8e7a72c9da2d3802e9d4ac1f2c6b6af28f21c4a7 (diff)
- Implement async XML-RPC client
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r--sugar/p2p/Stream.py84
1 files changed, 11 insertions, 73 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py
index ef95d60..7f00c57 100644
--- a/sugar/p2p/Stream.py
+++ b/sugar/p2p/Stream.py
@@ -1,7 +1,6 @@
import xmlrpclib
import socket
import traceback
-import threading
import pygtk
pygtk.require('2.0')
@@ -38,7 +37,7 @@ class Stream(object):
self._callback(self._group.get_buddy(nick_name), data)
-class UnicastStreamWriterBase(object):
+class UnicastStreamWriter(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
if not service:
@@ -48,84 +47,26 @@ class UnicastStreamWriterBase(object):
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
+ self._writer = network.GlibServerProxy(self._xmlrpc_addr)
-class UnicastStreamWriter(UnicastStreamWriterBase):
- def __init__(self, stream, service, owner_nick_name):
- UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
- self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
-
- def write(self, data):
+ def write(self, xmlrpc_data):
"""Write some data to the default endpoint of this pipe on the remote server."""
try:
- self._writer.message(self._owner_nick_name, data)
+ self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
return False
- def custom_request(self, method_name, *args):
+ def custom_request(self, method_name, request_cb, user_data, *args):
"""Call a custom XML-RPC method on the remote server."""
try:
method = getattr(self._writer, method_name)
- return method(*args)
- except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
- pass
- #traceback.print_exc()
- return None
-
-
-class ThreadedRequest(threading.Thread):
- def __init__(self, controller, addr, method, response_cb, user_data, *args):
- threading.Thread.__init__(self)
- self._controller = controller
- self._method = method
- self._args = args
- self._response_cb = response_cb
- self._user_data = user_data
- self._writer = xmlrpclib.ServerProxy(addr)
-
- def run(self):
- response = None
- try:
- method = getattr(self._writer, self._method)
- response = method(*self._args)
+ method(request_cb, user_data, *args)
+ return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
- if self._response_cb:
- gobject.idle_add(self._response_cb, response, self._user_data)
- self._controller.notify_request_done(self)
-
-class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
- def __init__(self, stream, service, owner_nick_name):
- self._requests_lock = threading.Lock()
- self._requests = []
- UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
-
- def _add_request(self, request):
- self._requests_lock.acquire()
- if not request in self._requests:
- self._requests.append(request)
- self._requests_lock.release()
-
- def write(self, response_cb, user_data, data):
- """Write some data to the default endpoint of this pipe on the remote server."""
- request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
- user_data, self._owner_nick_name, data)
- self._add_request(request)
- request.start()
-
- def custom_request(self, method_name, response_cb, user_data, *args):
- """Call a custom XML-RPC method on the remote server."""
- request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
- user_data, *args)
- self._add_request(request)
- request.start()
-
- def notify_request_done(self, request):
- self._requests_lock.acquire()
- if request in self._requests:
- self._requests.remove(request)
- self._requests_lock.release()
+ return False
class UnicastStream(Stream):
@@ -159,11 +100,8 @@ class UnicastStream(Stream):
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
- def new_writer(self, service, threaded=False):
- if threaded:
- return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
- else:
- return UnicastStreamWriter(self, service, self._owner_nick_name)
+ def new_writer(self, service):
+ return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
@@ -183,5 +121,5 @@ class MulticastStream(Stream):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
- def new_writer(self, service=None, threaded=False):
+ def new_writer(self, service=None):
return self