Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2006-06-12 22:31:26 (GMT)
committer Dan Williams <dcbw@redhat.com>2006-06-12 22:31:26 (GMT)
commit17c371119dfff8285775e6cb69af97433595ac55 (patch)
tree7af4ec838dff1763217b84de26f9fb4129c5fa5d /sugar/p2p
parentd931dca5799ee1e88ce327bf28424f9739f4ad87 (diff)
More presence service rework
Diffstat (limited to 'sugar/p2p')
-rw-r--r--sugar/p2p/Stream.py112
1 files changed, 69 insertions, 43 deletions
diff --git a/sugar/p2p/Stream.py b/sugar/p2p/Stream.py
index 375c66e..45d61a5 100644
--- a/sugar/p2p/Stream.py
+++ b/sugar/p2p/Stream.py
@@ -1,44 +1,49 @@
import xmlrpclib
import socket
import traceback
+import random
import network
from MostlyReliablePipe import MostlyReliablePipe
+from sugar.presence import Service
class Stream(object):
- def __init__(self, service, group):
- if not service:
- raise ValueError("service must be valid")
+ def __init__(self, service):
+ if not isinstance(service, Service.Service):
+ raise ValueError("service must be valid.")
+ if not service.get_port():
+ raise ValueError("service must have an address.")
self._service = service
- self._group = group
- self._owner_nick_name = self._group.get_owner().get_nick_name()
- self._port = self._service.get_port()
+ self._reader_port = self._service.get_port()
+ self._writer_port = self._reader_port
self._address = self._service.get_address()
self._callback = None
- def new_from_service(service, group):
- if service.is_multicast():
- return MulticastStream(service, group)
+ def new_from_service(service, start_reader=True):
+ if not isinstance(service, Service.Service):
+ raise ValueError("service must be valid.")
+ if service.is_multicast_service():
+ return MulticastStream(service)
else:
- return UnicastStream(service, group)
+ return UnicastStream(service, start_reader)
new_from_service = staticmethod(new_from_service)
def set_data_listener(self, callback):
self._callback = callback
- def recv(self, nick_name, data):
- if nick_name != self._owner_nick_name:
- if self._callback:
- self._callback(self._group.get_buddy(nick_name), data)
+ def _recv(self, address, data):
+ if self._callback:
+ self._callback(data)
class UnicastStreamWriter(object):
- def __init__(self, stream, service, owner_nick_name):
+ def __init__(self, stream, service):
# set up the writer
- if not service:
+ if not isinstance(service, Service.Service):
raise ValueError("service must be valid")
self._service = service
- self._owner_nick_name = owner_nick_name
+ if not service.get_address():
+ raise ValueError("service must have a valid address.")
self._address = self._service.get_address()
self._port = self._service.get_port()
self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
@@ -47,7 +52,7 @@ class UnicastStreamWriter(object):
def write(self, xmlrpc_data):
"""Write some data to the default endpoint of this pipe on the remote server."""
try:
- self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
+ self._writer.message(None, None, xmlrpc_data)
return True
except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
traceback.print_exc()
@@ -65,58 +70,79 @@ class UnicastStreamWriter(object):
class UnicastStream(Stream):
- def __init__(self, service, group):
- Stream.__init__(self, service, group)
- self._setup()
-
- def _setup(self):
+ def __init__(self, service, start_reader=True):
+ """Initializes the stream. If the 'start_reader' argument is True,
+ the stream will initialize and start a new stream reader, if it
+ is False, no reader will be created and the caller must call the
+ start_reader() method to start the stream reader and be able to
+ receive any data from the stream."""
+ Stream.__init__(self, service)
+ if start_reader:
+ self.start_reader()
+
+ def start_reader(self, update_service_port=True):
+ """Start the stream's reader, which for UnicastStream objects is
+ and XMLRPC server. If there's a port conflict with some other
+ service, the reader will try to find another port to use instead.
+ Returns the port number used for the reader."""
# Set up the reader
started = False
tries = 10
- port = self._service.get_port()
self._reader = None
while not started and tries > 0:
try:
- self._reader = network.GlibXMLRPCServer(("", port))
+ self._reader = network.GlibXMLRPCServer(("", self._reader_port))
self._reader.register_function(self._message, "message")
+ if update_service_port:
+ self._service.set_port(self._reader_port) # Update the service's port
started = True
except(socket.error):
- port = port + 1
+ self._reader_port = random.randint(self._reader_port + 1, 65500)
tries = tries - 1
if self._reader is None:
- print 'Could not start xmlrpc server.'
- self._service.set_port(port)
+ print 'Could not start stream reader.'
+ return self._reader_port
- def _message(self, nick_name, message):
+ def _message(self, message):
"""Called by the XMLRPC server when network data arrives."""
- self.recv(nick_name, message)
+ address = network.get_authinfo()
+ self._recv(address, message)
return True
- def register_handler(self, handler, name):
+ def register_reader_handler(self, handler, name):
+ """Register a custom message handler with the reader. This call
+ adds a custom XMLRPC method call with the name 'name' to the reader's
+ XMLRPC server, which then calls the 'handler' argument back when
+ a method call for it arrives over the network."""
if name == "message":
raise ValueError("Handler name 'message' is a reserved handler.")
self._reader.register_function(handler, name)
def new_writer(self, service):
- return UnicastStreamWriter(self, service, self._owner_nick_name)
+ """Return a new stream writer object."""
+ return UnicastStreamWriter(self, service)
class MulticastStream(Stream):
- def __init__(self, service, group):
- Stream.__init__(self, service, group)
- self._address = self._service.get_group_address()
- self._setup()
-
- def _setup(self):
- self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb)
+ def __init__(self, service):
+ Stream.__init__(self, service)
+ self._internal_start_reader()
+
+ def start_reader(self):
+ return self._reader_port
+
+ def _internal_start_reader(self):
+ if not service.get_address():
+ raise ValueError("service must have a valid address.")
+ self._pipe = MostlyReliablePipe('', self._address, self._reader_port,
+ self._recv_data_cb)
self._pipe.start()
def write(self, data):
- self._pipe.send(self._owner_nick_name + " |**| " + data)
+ self._pipe.send(data)
- def _recv_data_cb(self, addr, data, user_data=None):
- [ nick_name, data ] = data.split(" |**| ", 2)
- self.recv(nick_name, data)
+ def _recv_data_cb(self, address, data, user_data=None):
+ self._recv(address, data)
def new_writer(self, service=None):
return self