Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/Stream.py
diff options
context:
space:
mode:
authorMarco Pesenti Gritti <mpg@redhat.com>2006-05-22 06:11:39 (GMT)
committer Marco Pesenti Gritti <mpg@redhat.com>2006-05-22 06:11:39 (GMT)
commitf5173c33d07c572644e7769cc5e46d0217b529c1 (patch)
tree073e189e0c2e655c321e43e628e71087e599d1cc /sugar/p2p/Stream.py
parentd3c5da1a667c381346879929cf5471df85d72d23 (diff)
Merge
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r--sugar/p2p/Stream.py75
1 files changed, 71 insertions, 4 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py
index e6c1a91..2467f5a 100644
--- a/sugar/p2p/Stream.py
+++ b/sugar/p2p/Stream.py
@@ -1,6 +1,12 @@
import xmlrpclib
import socket
import traceback
+import threading
+
+import pygtk
+pygtk.require('2.0')
+import gobject
+
import network
from MostlyReliablePipe import MostlyReliablePipe
@@ -32,7 +38,7 @@ class Stream(object):
self._callback(self._group.get_buddy(nick_name), data)
-class UnicastStreamWriter(object):
+class UnicastStreamWriterBase(object):
def __init__(self, stream, service, owner_nick_name):
# set up the writer
if not service:
@@ -42,6 +48,10 @@ class UnicastStreamWriter(object):
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
+
+class UnicastStreamWriter(UnicastStreamWriterBase):
+ def __init__(self, stream, service, owner_nick_name):
+ UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
def write(self, data):
@@ -63,6 +73,60 @@ class UnicastStreamWriter(object):
return None
+class ThreadedRequest(threading.Thread):
+ def __init__(self, controller, addr, method, response_cb, user_data, *args):
+ threading.Thread.__init__(self)
+ self._controller = controller
+ self._method = method
+ self._args = args
+ self._response_cb = response_cb
+ self._user_data = user_data
+ self._writer = xmlrpclib.ServerProxy(addr)
+
+ def run(self):
+ response = None
+ try:
+ method = getattr(self._writer, self._method)
+ response = method(*self._args)
+ except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
+ traceback.print_exc()
+ if self._response_cb:
+ gobject.idle_add(self._response_cb, response, self._user_data)
+ self._controller.notify_request_done(self)
+
+class ThreadedUnicastStreamWriter(UnicastStreamWriterBase):
+ def __init__(self, stream, service, owner_nick_name):
+ self._requests_lock = threading.Lock()
+ self._requests = []
+ UnicastStreamWriterBase.__init__(self, stream, service, owner_nick_name)
+
+ def _add_request(self, request):
+ self._requests_lock.acquire()
+ if not request in self._requests:
+ self._requests.append(request)
+ self._requests_lock.release()
+
+ def write(self, response_cb, user_data, data):
+ """Write some data to the default endpoint of this pipe on the remote server."""
+ request = ThreadedRequest(self, self._xmlrpc_addr, "message", response_cb,
+ user_data, self._owner_nick_name, data)
+ self._add_request(request)
+ request.start()
+
+ def custom_request(self, method_name, response_cb, user_data, *args):
+ """Call a custom XML-RPC method on the remote server."""
+ request = ThreadedRequest(self, self._xmlrpc_addr, method_name, response_cb,
+ user_data, *args)
+ self._add_request(request)
+ request.start()
+
+ def notify_request_done(self, request):
+ self._requests_lock.acquire()
+ if request in self._requests:
+ self._requests.remove(request)
+ self._requests_lock.release()
+
+
class UnicastStream(Stream):
def __init__(self, service, group):
Stream.__init__(self, service, group)
@@ -94,8 +158,11 @@ class UnicastStream(Stream):
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
- def new_writer(self, service):
- return UnicastStreamWriter(self, service, self._owner_nick_name)
+ def new_writer(self, service, threaded=False):
+ if threaded:
+ return ThreadedUnicastStreamWriter(self, service, self._owner_nick_name)
+ else:
+ return UnicastStreamWriter(self, service, self._owner_nick_name)
class MulticastStream(Stream):
@@ -115,5 +182,5 @@ class MulticastStream(Stream):
[ nick_name, data ] = data.split(" |**| ", 2)
self.recv(nick_name, data)
- def new_writer(self, service=None):
+ def new_writer(self, service=None, threaded=False):
return self