Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorDan Williams <dcbw@localhost.localdomain>2006-09-19 17:54:32 (GMT)
committer Dan Williams <dcbw@localhost.localdomain>2006-09-19 17:54:32 (GMT)
commitd0f23744f05d1c91c50ec41938c25919179feb9a (patch)
tree42fbe25d3554770f6f7ebbc39c7db248d8135717 /services
parentb6897cf1c59d27cdd6bc32f273af3533959556b2 (diff)
Use asynchronous service resolution to capture service updates too
Diffstat (limited to 'services')
-rw-r--r--services/presence/PresenceService.py122
-rw-r--r--services/presence/Service.py133
2 files changed, 174 insertions, 81 deletions
diff --git a/services/presence/PresenceService.py b/services/presence/PresenceService.py
index e541387..976d5e0 100644
--- a/services/presence/PresenceService.py
+++ b/services/presence/PresenceService.py
@@ -29,6 +29,11 @@ class ServiceAdv(object):
raise ValueError("local must be a boolean.")
self._local = local
self._state = _SA_UNRESOLVED
+ self._resolver = None
+
+ def __del__(self):
+ if self._resolver:
+ del self._resolver
def interface(self):
return self._interface
@@ -47,7 +52,14 @@ class ServiceAdv(object):
def set_service(self, service):
if not isinstance(service, Service.Service):
raise ValueError("must be a valid service.")
- self._service = service
+ if service != self._service:
+ self._service = service
+ def resolver(self):
+ return self._resolver
+ def set_resolver(self, resolver):
+ if not isinstance(resolver, dbus.Interface):
+ raise ValueError("must be a valid dbus object")
+ self._resolver = resolver
def state(self):
return self._state
def set_state(self, state):
@@ -452,12 +464,16 @@ class PresenceService(object):
self._dbus_helper.ActivityDisappeared(activity.object_path())
del self._activities[actid]
- def _resolve_service_reply_cb(self, adv, interface, protocol, full_name,
- stype, domain, host, aprotocol, address, port, txt, flags):
+ def _service_resolved_cb(self, adv, interface, protocol, full_name,
+ stype, domain, host, aprotocol, address, port, txt, flags,
+ updated):
"""When the service discovery finally gets here, we've got enough
information about the service to assign it to a buddy."""
- logging.debug("Resolved service '%s' type '%s' domain '%s' to " \
- " %s:%s" % (full_name, stype, domain, address, port))
+ tag = "Resolved"
+ if updated:
+ tag = "Updated"
+ logging.debug("%s service '%s' type '%s' domain '%s' to " \
+ " %s:%s" % (tag, full_name, stype, domain, address, port))
if not adv in self._service_advs:
return False
@@ -465,25 +481,33 @@ class PresenceService(object):
return False
# See if we know about this service already
+ service = None
key = (full_name, stype)
+ props = _txt_to_dict(txt)
if not self._services.has_key(key):
objid = self._get_next_object_id()
- props = _txt_to_dict(txt)
service = Service.Service(self._bus_name, objid, name=full_name,
stype=stype, domain=domain, address=address, port=port,
- properties=props, source_address=address, local=adv.is_local())
+ properties=props, source_address=address)
self._services[key] = service
else:
- # Already tracking this service; likely we were the one that shared it
- # in the first place, and therefore the source address would not have
- # been set yet
+ # Already tracking this service; either:
+ # a) we were the one that shared it in the first place,
+ # and therefore the source address would not have
+ # been set yet
+ # b) the service has been updated
service = self._services[key]
if not service.get_source_address():
service.set_source_address(address)
if not service.get_address():
service.set_address(address)
+
adv.set_service(service)
+ if service and updated:
+ service.set_properties(props, from_network=True)
+ return False
+
# Merge the service into our buddy and activity lists, if needed
buddy = self._handle_new_service_for_buddy(service, adv.is_local())
if buddy and service.get_activity_id():
@@ -491,24 +515,34 @@ class PresenceService(object):
return False
- def _resolve_service_reply_cb_glue(self, adv, interface, protocol, name,
+ def _service_resolved_cb_glue(self, adv, interface, protocol, name,
stype, domain, host, aprotocol, address, port, txt, flags):
+ # Avahi doesn't flag updates to existing services, so we have
+ # to determine that here
+ updated = False
+ if adv.state() == _SA_RESOLVED:
+ updated = True
+
adv.set_state(_SA_RESOLVED)
- gobject.idle_add(self._resolve_service_reply_cb, adv, interface,
+ gobject.idle_add(self._service_resolved_cb, adv, interface,
protocol, name, stype, domain, host, aprotocol, address,
- port, txt, flags)
+ port, txt, flags, updated)
- def _resolve_service_error_handler(self, adv, err):
+ def _service_resolved_failure_cb(self, adv, err):
adv.set_state(_SA_UNRESOLVED)
logging.error("Error resolving service %s.%s: %s" % (adv.name(), adv.stype(), err))
def _resolve_service(self, adv):
"""Resolve and lookup a ZeroConf service to obtain its address and TXT records."""
# Ask avahi to resolve this particular service
- self._mdns_service.ResolveService(int(adv.interface()), int(adv.protocol()), adv.name(),
- adv.stype(), adv.domain(), avahi.PROTO_UNSPEC, dbus.UInt32(0),
- reply_handler=lambda *args: self._resolve_service_reply_cb_glue(adv, *args),
- error_handler=lambda *args: self._resolve_service_error_handler(adv, *args))
+ path = self._mdns_service.ServiceResolverNew(dbus.Int32(adv.interface()),
+ dbus.Int32(adv.protocol()), adv.name(), adv.stype(), adv.domain(),
+ avahi.PROTO_UNSPEC, dbus.UInt32(0))
+ resolver = dbus.Interface(self._system_bus.get_object(avahi.DBUS_NAME, path),
+ avahi.DBUS_INTERFACE_SERVICE_RESOLVER)
+ resolver.connect_to_signal('Found', lambda *args: self._service_resolved_cb_glue(adv, *args))
+ resolver.connect_to_signal('Failure', lambda *args: self._service_resolved_failure_cb(adv, *args))
+ adv.set_resolver(resolver)
return False
def _service_appeared_cb(self, interface, protocol, full_name, stype, domain, flags):
@@ -679,6 +713,11 @@ class PresenceService(object):
def register_service(self, name, stype, properties={}, address=None,
port=-1, domain=u"local", sender=None):
"""Register a new service, advertising it to other Buddies on the network."""
+ # Refuse to register if we can't get the dbus connection this request
+ # came from for some reason
+ if not sender:
+ raise RuntimeError("Service registration request must have a sender.")
+
(actid, person_name) = Service.decompose_service_name(name)
if self.get_owner() and person_name != self.get_owner().get_name():
raise RuntimeError("Tried to register a service that didn't have" \
@@ -688,49 +727,18 @@ class PresenceService(object):
if not port or port == -1:
port = random.randint(4000, 65000)
- try:
- obj = self._system_bus.get_object(avahi.DBUS_NAME, self._mdns_service.EntryGroupNew())
- group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP)
-
- # Add properties; ensure they are converted to ByteArray types
- # because python sometimes can't figure that out
- info = dbus.Array([], signature="aay")
- for k, v in properties.items():
- info.append(dbus.types.ByteArray("%s=%s" % (k, v)))
-
- objid = self._get_next_object_id()
- service = Service.Service(self._bus_name, objid, name=name,
- stype=stype, domain=domain, address=address, port=port,
- properties=properties, source_address=None, local=True,
- local_publisher=sender)
- self._services[(name, stype)] = service
- port = service.get_port()
-
- logging.debug("Will register service with name='%s', stype='%s'," \
- " domain='%s', address='%s', port=%d, info='%s'" % (name, stype,
- domain, address, port, info))
- group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, dbus.String(name),
- dbus.String(stype), dbus.String(domain), dbus.String(""), # let Avahi figure the 'host' out
- dbus.UInt16(port), info)
- service.set_avahi_entry_group(group)
- group.Commit()
- except dbus.exceptions.DBusException, exc:
- # FIXME: ignore local name collisions, since that means
- # the zeroconf service is already registered. Ideally we
- # should un-register it an re-register with the correct info
- if str(exc) == "Local name collision":
- pass
+ objid = self._get_next_object_id()
+ service = Service.Service(self._bus_name, objid, name=name,
+ stype=stype, domain=domain, address=address, port=port,
+ properties=properties, source_address=None,
+ local_publisher=sender)
+ self._services[(name, stype)] = service
self.register_service_type(stype)
+ service.register(self._system_bus, self._mdns_service)
return service
def unregister_service(self, service, sender=None):
- local_publisher = service.get_local_publisher()
- if sender is not None and local_publisher != sender:
- raise ValueError("Service was not registered by requesting process!")
- group = service.get_avahi_entry_group()
- if not group:
- raise ValueError("Service was not a local service provided by this laptop!")
- group.Free()
+ service.unregister(sender)
def register_service_type(self, stype):
"""Requests that the Presence service look for and recognize
diff --git a/services/presence/Service.py b/services/presence/Service.py
index 0c23600..1b29f98 100644
--- a/services/presence/Service.py
+++ b/services/presence/Service.py
@@ -4,6 +4,7 @@ sys.path.insert(0, os.path.abspath("../../"))
from sugar import util
import dbus, dbus.service
import random
+import logging
def compose_service_name(name, activity_id):
if type(name) == type(""):
@@ -37,6 +38,31 @@ def decompose_service_name(name):
return (None, name)
return (activity_id, name[:start - 2])
+def _one_dict_differs(dict1, dict2):
+ diff_keys = []
+ for key, value in dict1.items():
+ if not dict2.has_key(key) or dict2[key] != value:
+ diff_keys.append(key)
+ return diff_keys
+
+def _dicts_differ(dict1, dict2):
+ diff_keys = []
+ diff1 = _one_dict_differs(dict1, dict2)
+ diff2 = _one_dict_differs(dict2, dict1)
+ for key in diff2:
+ if key not in diff1:
+ diff_keys.append(key)
+ diff_keys += diff1
+ return diff_keys
+
+def _convert_properties_to_dbus_byte_array(props):
+ # Ensure properties are converted to ByteArray types
+ # because python sometimes can't figure that out
+ info = dbus.Array([], signature="aay")
+ for k, v in props.items():
+ info.append(dbus.types.ByteArray("%s=%s" % (k, v)))
+ return info
+
_ACTIVITY_ID_TAG = "ActivityID"
SERVICE_DBUS_INTERFACE = "org.laptop.Presence.Service"
@@ -104,15 +130,18 @@ class ServiceDBusHelper(dbus.service.Object):
@dbus.service.method(SERVICE_DBUS_INTERFACE,
in_signature="a{sv}", sender_keyword="sender")
def setPublishedValues(self, values, sender):
+ if not self._parent.is_local():
+ raise ValueError("Service was not not registered by requesting process!")
self._parent.set_properties(values, sender)
class Service(object):
"""Encapsulates information about a specific ZeroConf/mDNS
service as advertised on the network."""
+
def __init__(self, bus_name, object_id, name, stype, domain=u"local",
address=None, port=-1, properties=None, source_address=None,
- local=False, local_publisher=None):
+ local_publisher=None):
if not bus_name:
raise ValueError("DBus bus name must be valid")
if not object_id or type(object_id) != type(1):
@@ -136,6 +165,12 @@ class Service(object):
if domain and domain != "local":
raise ValueError("must use the 'local' domain (for now).")
+ # ID of the D-Bus connection that published this service, if any.
+ # We only let the local publisher modify the service.
+ self._local_publisher = local_publisher
+
+ self._avahi_entry_group = None
+
(actid, real_name) = decompose_service_name(name)
self._name = real_name
self._full_name = name
@@ -143,10 +178,9 @@ class Service(object):
self._domain = domain
self._port = -1
self.set_port(port)
- self._properties = {}
- self._internal_set_properties(properties, first_time=True)
- self._avahi_entry_group = None
- self._local = local
+ self._properties = None
+ self._dbus_helper = None
+ self._internal_set_properties(properties)
# Source address is the unicast source IP
self._source_address = None
@@ -175,10 +209,6 @@ class Service(object):
self._owner = None
- # ID of the D-Bus connection that published this service, if any.
- # We only let the local publisher modify the service.
- self._local_publisher = local_publisher
-
# register ourselves with dbus
self._object_id = object_id
self._object_path = SERVICE_DBUS_OBJECT_PATH + str(self._object_id)
@@ -195,11 +225,10 @@ class Service(object):
raise RuntimeError("Can only set a service's owner once")
self._owner = owner
- def get_local_publisher(self):
- return self._local_publisher
-
def is_local(self):
- return self._local
+ if self._local_publisher is not None:
+ return True
+ return False
def get_name(self):
"""Return the service's name, usually that of the
@@ -225,12 +254,14 @@ class Service(object):
def set_property(self, key, value, sender=None):
"""Set one service property"""
+ if not self._local_publisher:
+ raise ValueError("Service was not not registered by requesting process!")
if sender is not None and self._local_publisher != sender:
raise ValueError("Service was not not registered by requesting process!")
if type(key) != type(u""):
raise ValueError("Key must be a unicode string.")
- if type(value) != type(u"") or type(value) != type(True):
+ if type(value) != type(u"") and type(value) != type(True):
raise ValueError("Key must be a unicode string or a boolean.")
# Ignore setting the key to it's current value
@@ -254,15 +285,22 @@ class Service(object):
if type(value) == type(True):
value = ""
self._properties[key] = value
- self._dbus_helper.PublishedValueChanged(key)
- def set_properties(self, properties, sender=None):
+ # if the service is locally published already, update the TXT records
+ if self._local_publisher and self._avahi_entry_group:
+ self.__internal_update_avahi_properties()
+
+ if self._dbus_helper:
+ self._dbus_helper.PublishedValueChanged([key])
+
+ def set_properties(self, properties, sender=None, from_network=False):
"""Set all service properties in one call"""
if sender is not None and self._local_publisher != sender:
raise ValueError("Service was not not registered by requesting process!")
- self._internal_set_properties(properties)
- def _internal_set_properties(self, properties, first_time=False):
+ self._internal_set_properties(properties, from_network)
+
+ def _internal_set_properties(self, properties, from_network=False):
"""Set the service's properties from either an Avahi
TXT record (a list of lists of integers), or a
python dictionary."""
@@ -270,6 +308,11 @@ class Service(object):
raise ValueError("Properties must be a dictionary.")
self._properties = {}
+ # Make sure the properties are actually different
+ diff_keys = _dicts_differ(self._properties, properties)
+ if len(diff_keys) == 0:
+ return
+
# Set key/value pairs on internal property list
for key, value in properties.items():
if len(key) == 0:
@@ -282,8 +325,19 @@ class Service(object):
tmp_val = unicode(tmp_val)
self._properties[tmp_key] = tmp_val
- if not first_time:
- self._dbus_helper.PublishedValueChanged(self._properties.keys())
+ # if the service is locally published already, update the TXT records
+ if self._local_publisher and self._avahi_entry_group and not from_network:
+ self.__internal_update_avahi_properties()
+
+ if self._dbus_helper:
+ self._dbus_helper.PublishedValueChanged(diff_keys)
+
+ def __internal_update_avahi_properties(self):
+ info = _convert_properties_to_dbus_byte_array(self._properties)
+ self._avahi_entry_group.UpdateServiceTxt(avahi.IF_UNSPEC,
+ avahi.PROTO_UNSPEC, 0,
+ dbus.String(self._full_name), dbus.String(self._stype),
+ dbus.String(self._domain), info)
def get_type(self):
"""Return the service's service type."""
@@ -322,11 +376,42 @@ class Service(object):
"""Return the ZeroConf/mDNS domain the service was found in."""
return self._domain
- def set_avahi_entry_group(self, group):
- self._avahi_entry_group = group
+ def register(self, system_bus, avahi_service):
+ if self._avahi_entry_group is not None:
+ raise RuntimeError("Service already registered!")
+
+ obj = system_bus.get_object(avahi.DBUS_NAME, avahi_service.EntryGroupNew())
+ self._avahi_entry_group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP)
+
+ info = _convert_properties_to_dbus_byte_array(self._properties)
+ logging.debug("Will register service with name='%s', stype='%s'," \
+ " domain='%s', address='%s', port=%d, info='%s'" % (self._full_name,
+ self._stype, self._domain, self._address, self._port, info))
+ self._avahi_entry_group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0,
+ dbus.String(self._full_name), dbus.String(self._stype),
+ dbus.String(self._domain), dbus.String(""), # let Avahi figure the 'host' out
+ dbus.UInt16(self._port), info)
+ self._avahi_entry_group.connect_to_signal('StateChanged', self.__entry_group_changed_cb)
+ self._avahi_entry_group.Commit()
+
+ def __entry_group_changed_cb(self, state, error):
+ logging.debug("** %s.%s Entry group changed: state %s, error %s" % (self._full_name, self._stype, state, error))
+
+ def unregister(self, sender):
+ # Refuse to unregister if we can't get the dbus connection this request
+ # came from for some reason
+ if not sender:
+ raise RuntimeError("Service registration request must have a sender.")
+ if not self._local_publisher:
+ raise ValueError("Service was not a local service provided by this laptop!")
+ if sender is not None and self._local_publisher != sender:
+ raise ValueError("Service was not registered by requesting process!")
+ if not self._avahi_entry_group:
+ raise ValueError("Service was not registered by requesting process!")
+ self._avahi_entry_group.Free()
+ del self._avahi_entry_group
+ self._avahi_entry_group = None
- def get_avahi_entry_group(self):
- return self._avahi_entry_group
#################################################################
# Tests