Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/Stream.py
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2006-05-19 18:50:20 (GMT)
committer Dan Williams <dcbw@redhat.com>2006-05-19 18:50:20 (GMT)
commit9a1324d0b08af22044424f399edfd032a287e7d7 (patch)
tree4bc8d750a72fd9c3c256a0feef01a20a44f775eb /sugar/p2p/Stream.py
parent8cfc17ff5536a4f8135fbebfb791647ee23aec61 (diff)
Replace StreamReader and StreamWriter with a unified Stream class. Since the
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r--sugar/p2p/Stream.py99
1 files changed, 99 insertions, 0 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py
new file mode 100644
index 0000000..77d8945
--- /dev/null
+++ b/sugar/p2p/Stream.py
@@ -0,0 +1,99 @@
+import xmlrpclib
+import socket
+
+import network
+from MostlyReliablePipe import MostlyReliablePipe
+
+class Stream(object):
+ def __init__(self, service, group):
+ self._service = service
+ self._group = group
+ self._owner_nick_name = self._group.get_owner().get_nick_name()
+ self._port = self._service.get_port()
+ self._address = self._service.get_address()
+ self._callback = None
+
+ def new_from_service(service, group):
+ if service.is_multicast():
+ return MulticastStream(service, group)
+ else:
+ return UnicastStream(service, group)
+ new_from_service = staticmethod(new_from_service)
+
+ def set_data_listener(self, callback):
+ self._callback = callback
+
+ def recv(self, nick_name, data):
+ if nick_name != self._owner_nick_name:
+ if self._callback:
+ self._callback(self._group.get_buddy(nick_name), data)
+
+
+class UnicastStreamWriter(object):
+ def __init__(self, stream, service, owner_nick_name):
+ # set up the writer
+ self._service = service
+ self._owner_nick_name = owner_nick_name
+ self._address = self._service.get_address()
+ self._port = self._service.get_port()
+ self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
+ self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr)
+
+ def write(self, data):
+ try:
+ self._writer.message(self._owner_nick_name, data)
+ return True
+ except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
+ traceback.print_exc()
+ return False
+
+
+class UnicastStream(Stream):
+ def __init__(self, service, group):
+ Stream.__init__(self, service, group)
+ self._setup()
+
+ def _setup(self):
+ # Set up the reader
+ started = False
+ tries = 10
+ port = self._service.get_port()
+ self._reader = None
+ while not started and tries > 0:
+ try:
+ self._reader = network.GlibXMLRPCServer(("", port))
+ self._reader.register_instance(self)
+ started = True
+ except(socket.error):
+ port = port + 1
+ tries = tries - 1
+ self._service.set_port(port)
+
+ def message(self, nick_name, message):
+ """Called by the XMLRPC server when network data arrives."""
+ self.recv(nick_name, message)
+ return True
+
+ def new_writer(self, service):
+ return UnicastStreamWriter(self, service, self._owner_nick_name)
+
+
+class MulticastStream(Stream):
+ def __init__(self, service, group):
+ Stream.__init__(self, service, group)
+ self._address = self._service.get_group_address()
+ self._setup()
+
+ def _setup(self):
+ self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb)
+ self._pipe.start()
+
+ def write(self, data):
+ self._pipe.send(self._owner_nick_name + " |**| " + data)
+
+ def _recv_data_cb(self, addr, data, user_data=None):
+ [ nick_name, data ] = data.split(" |**| ", 2)
+ self.recv(nick_name, data)
+
+ def new_writer(self, service=None):
+ return self