diff options
author | Dan Williams <dcbw@redhat.com> | 2006-05-19 18:50:20 (GMT) |
---|---|---|
committer | Dan Williams <dcbw@redhat.com> | 2006-05-19 18:50:20 (GMT) |
commit | 9a1324d0b08af22044424f399edfd032a287e7d7 (patch) | |
tree | 4bc8d750a72fd9c3c256a0feef01a20a44f775eb /sugar/p2p/Stream.py | |
parent | 8cfc17ff5536a4f8135fbebfb791647ee23aec61 (diff) |
Replace StreamReader and StreamWriter with a unified Stream class. Since the
Diffstat (limited to 'sugar/p2p/Stream.py')
-rw-r--r-- | sugar/p2p/Stream.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py new file mode 100644 index 0000000..77d8945 --- /dev/null +++ b/sugar/p2p/Stream.py @@ -0,0 +1,99 @@ +import xmlrpclib +import socket + +import network +from MostlyReliablePipe import MostlyReliablePipe + +class Stream(object): + def __init__(self, service, group): + self._service = service + self._group = group + self._owner_nick_name = self._group.get_owner().get_nick_name() + self._port = self._service.get_port() + self._address = self._service.get_address() + self._callback = None + + def new_from_service(service, group): + if service.is_multicast(): + return MulticastStream(service, group) + else: + return UnicastStream(service, group) + new_from_service = staticmethod(new_from_service) + + def set_data_listener(self, callback): + self._callback = callback + + def recv(self, nick_name, data): + if nick_name != self._owner_nick_name: + if self._callback: + self._callback(self._group.get_buddy(nick_name), data) + + +class UnicastStreamWriter(object): + def __init__(self, stream, service, owner_nick_name): + # set up the writer + self._service = service + self._owner_nick_name = owner_nick_name + self._address = self._service.get_address() + self._port = self._service.get_port() + self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port) + self._writer = xmlrpclib.ServerProxy(self._xmlrpc_addr) + + def write(self, data): + try: + self._writer.message(self._owner_nick_name, data) + return True + except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError): + traceback.print_exc() + return False + + +class UnicastStream(Stream): + def __init__(self, service, group): + Stream.__init__(self, service, group) + self._setup() + + def _setup(self): + # Set up the reader + started = False + tries = 10 + port = self._service.get_port() + self._reader = None + while not started and tries > 0: + try: + self._reader = network.GlibXMLRPCServer(("", port)) + self._reader.register_instance(self) + started = True + except(socket.error): + port = port + 1 + tries = tries - 1 + self._service.set_port(port) + + def message(self, nick_name, message): + """Called by the XMLRPC server when network data arrives.""" + self.recv(nick_name, message) + return True + + def new_writer(self, service): + return UnicastStreamWriter(self, service, self._owner_nick_name) + + +class MulticastStream(Stream): + def __init__(self, service, group): + Stream.__init__(self, service, group) + self._address = self._service.get_group_address() + self._setup() + + def _setup(self): + self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb) + self._pipe.start() + + def write(self, data): + self._pipe.send(self._owner_nick_name + " |**| " + data) + + def _recv_data_cb(self, addr, data, user_data=None): + [ nick_name, data ] = data.split(" |**| ", 2) + self.recv(nick_name, data) + + def new_writer(self, service=None): + return self |