From d0f23744f05d1c91c50ec41938c25919179feb9a Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 19 Sep 2006 17:54:32 +0000 Subject: Use asynchronous service resolution to capture service updates too --- (limited to 'services') diff --git a/services/presence/PresenceService.py b/services/presence/PresenceService.py index e541387..976d5e0 100644 --- a/services/presence/PresenceService.py +++ b/services/presence/PresenceService.py @@ -29,6 +29,11 @@ class ServiceAdv(object): raise ValueError("local must be a boolean.") self._local = local self._state = _SA_UNRESOLVED + self._resolver = None + + def __del__(self): + if self._resolver: + del self._resolver def interface(self): return self._interface @@ -47,7 +52,14 @@ class ServiceAdv(object): def set_service(self, service): if not isinstance(service, Service.Service): raise ValueError("must be a valid service.") - self._service = service + if service != self._service: + self._service = service + def resolver(self): + return self._resolver + def set_resolver(self, resolver): + if not isinstance(resolver, dbus.Interface): + raise ValueError("must be a valid dbus object") + self._resolver = resolver def state(self): return self._state def set_state(self, state): @@ -452,12 +464,16 @@ class PresenceService(object): self._dbus_helper.ActivityDisappeared(activity.object_path()) del self._activities[actid] - def _resolve_service_reply_cb(self, adv, interface, protocol, full_name, - stype, domain, host, aprotocol, address, port, txt, flags): + def _service_resolved_cb(self, adv, interface, protocol, full_name, + stype, domain, host, aprotocol, address, port, txt, flags, + updated): """When the service discovery finally gets here, we've got enough information about the service to assign it to a buddy.""" - logging.debug("Resolved service '%s' type '%s' domain '%s' to " \ - " %s:%s" % (full_name, stype, domain, address, port)) + tag = "Resolved" + if updated: + tag = "Updated" + logging.debug("%s service '%s' type '%s' domain '%s' to " \ + " %s:%s" % (tag, full_name, stype, domain, address, port)) if not adv in self._service_advs: return False @@ -465,25 +481,33 @@ class PresenceService(object): return False # See if we know about this service already + service = None key = (full_name, stype) + props = _txt_to_dict(txt) if not self._services.has_key(key): objid = self._get_next_object_id() - props = _txt_to_dict(txt) service = Service.Service(self._bus_name, objid, name=full_name, stype=stype, domain=domain, address=address, port=port, - properties=props, source_address=address, local=adv.is_local()) + properties=props, source_address=address) self._services[key] = service else: - # Already tracking this service; likely we were the one that shared it - # in the first place, and therefore the source address would not have - # been set yet + # Already tracking this service; either: + # a) we were the one that shared it in the first place, + # and therefore the source address would not have + # been set yet + # b) the service has been updated service = self._services[key] if not service.get_source_address(): service.set_source_address(address) if not service.get_address(): service.set_address(address) + adv.set_service(service) + if service and updated: + service.set_properties(props, from_network=True) + return False + # Merge the service into our buddy and activity lists, if needed buddy = self._handle_new_service_for_buddy(service, adv.is_local()) if buddy and service.get_activity_id(): @@ -491,24 +515,34 @@ class PresenceService(object): return False - def _resolve_service_reply_cb_glue(self, adv, interface, protocol, name, + def _service_resolved_cb_glue(self, adv, interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags): + # Avahi doesn't flag updates to existing services, so we have + # to determine that here + updated = False + if adv.state() == _SA_RESOLVED: + updated = True + adv.set_state(_SA_RESOLVED) - gobject.idle_add(self._resolve_service_reply_cb, adv, interface, + gobject.idle_add(self._service_resolved_cb, adv, interface, protocol, name, stype, domain, host, aprotocol, address, - port, txt, flags) + port, txt, flags, updated) - def _resolve_service_error_handler(self, adv, err): + def _service_resolved_failure_cb(self, adv, err): adv.set_state(_SA_UNRESOLVED) logging.error("Error resolving service %s.%s: %s" % (adv.name(), adv.stype(), err)) def _resolve_service(self, adv): """Resolve and lookup a ZeroConf service to obtain its address and TXT records.""" # Ask avahi to resolve this particular service - self._mdns_service.ResolveService(int(adv.interface()), int(adv.protocol()), adv.name(), - adv.stype(), adv.domain(), avahi.PROTO_UNSPEC, dbus.UInt32(0), - reply_handler=lambda *args: self._resolve_service_reply_cb_glue(adv, *args), - error_handler=lambda *args: self._resolve_service_error_handler(adv, *args)) + path = self._mdns_service.ServiceResolverNew(dbus.Int32(adv.interface()), + dbus.Int32(adv.protocol()), adv.name(), adv.stype(), adv.domain(), + avahi.PROTO_UNSPEC, dbus.UInt32(0)) + resolver = dbus.Interface(self._system_bus.get_object(avahi.DBUS_NAME, path), + avahi.DBUS_INTERFACE_SERVICE_RESOLVER) + resolver.connect_to_signal('Found', lambda *args: self._service_resolved_cb_glue(adv, *args)) + resolver.connect_to_signal('Failure', lambda *args: self._service_resolved_failure_cb(adv, *args)) + adv.set_resolver(resolver) return False def _service_appeared_cb(self, interface, protocol, full_name, stype, domain, flags): @@ -679,6 +713,11 @@ class PresenceService(object): def register_service(self, name, stype, properties={}, address=None, port=-1, domain=u"local", sender=None): """Register a new service, advertising it to other Buddies on the network.""" + # Refuse to register if we can't get the dbus connection this request + # came from for some reason + if not sender: + raise RuntimeError("Service registration request must have a sender.") + (actid, person_name) = Service.decompose_service_name(name) if self.get_owner() and person_name != self.get_owner().get_name(): raise RuntimeError("Tried to register a service that didn't have" \ @@ -688,49 +727,18 @@ class PresenceService(object): if not port or port == -1: port = random.randint(4000, 65000) - try: - obj = self._system_bus.get_object(avahi.DBUS_NAME, self._mdns_service.EntryGroupNew()) - group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP) - - # Add properties; ensure they are converted to ByteArray types - # because python sometimes can't figure that out - info = dbus.Array([], signature="aay") - for k, v in properties.items(): - info.append(dbus.types.ByteArray("%s=%s" % (k, v))) - - objid = self._get_next_object_id() - service = Service.Service(self._bus_name, objid, name=name, - stype=stype, domain=domain, address=address, port=port, - properties=properties, source_address=None, local=True, - local_publisher=sender) - self._services[(name, stype)] = service - port = service.get_port() - - logging.debug("Will register service with name='%s', stype='%s'," \ - " domain='%s', address='%s', port=%d, info='%s'" % (name, stype, - domain, address, port, info)) - group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, dbus.String(name), - dbus.String(stype), dbus.String(domain), dbus.String(""), # let Avahi figure the 'host' out - dbus.UInt16(port), info) - service.set_avahi_entry_group(group) - group.Commit() - except dbus.exceptions.DBusException, exc: - # FIXME: ignore local name collisions, since that means - # the zeroconf service is already registered. Ideally we - # should un-register it an re-register with the correct info - if str(exc) == "Local name collision": - pass + objid = self._get_next_object_id() + service = Service.Service(self._bus_name, objid, name=name, + stype=stype, domain=domain, address=address, port=port, + properties=properties, source_address=None, + local_publisher=sender) + self._services[(name, stype)] = service self.register_service_type(stype) + service.register(self._system_bus, self._mdns_service) return service def unregister_service(self, service, sender=None): - local_publisher = service.get_local_publisher() - if sender is not None and local_publisher != sender: - raise ValueError("Service was not registered by requesting process!") - group = service.get_avahi_entry_group() - if not group: - raise ValueError("Service was not a local service provided by this laptop!") - group.Free() + service.unregister(sender) def register_service_type(self, stype): """Requests that the Presence service look for and recognize diff --git a/services/presence/Service.py b/services/presence/Service.py index 0c23600..1b29f98 100644 --- a/services/presence/Service.py +++ b/services/presence/Service.py @@ -4,6 +4,7 @@ sys.path.insert(0, os.path.abspath("../../")) from sugar import util import dbus, dbus.service import random +import logging def compose_service_name(name, activity_id): if type(name) == type(""): @@ -37,6 +38,31 @@ def decompose_service_name(name): return (None, name) return (activity_id, name[:start - 2]) +def _one_dict_differs(dict1, dict2): + diff_keys = [] + for key, value in dict1.items(): + if not dict2.has_key(key) or dict2[key] != value: + diff_keys.append(key) + return diff_keys + +def _dicts_differ(dict1, dict2): + diff_keys = [] + diff1 = _one_dict_differs(dict1, dict2) + diff2 = _one_dict_differs(dict2, dict1) + for key in diff2: + if key not in diff1: + diff_keys.append(key) + diff_keys += diff1 + return diff_keys + +def _convert_properties_to_dbus_byte_array(props): + # Ensure properties are converted to ByteArray types + # because python sometimes can't figure that out + info = dbus.Array([], signature="aay") + for k, v in props.items(): + info.append(dbus.types.ByteArray("%s=%s" % (k, v))) + return info + _ACTIVITY_ID_TAG = "ActivityID" SERVICE_DBUS_INTERFACE = "org.laptop.Presence.Service" @@ -104,15 +130,18 @@ class ServiceDBusHelper(dbus.service.Object): @dbus.service.method(SERVICE_DBUS_INTERFACE, in_signature="a{sv}", sender_keyword="sender") def setPublishedValues(self, values, sender): + if not self._parent.is_local(): + raise ValueError("Service was not not registered by requesting process!") self._parent.set_properties(values, sender) class Service(object): """Encapsulates information about a specific ZeroConf/mDNS service as advertised on the network.""" + def __init__(self, bus_name, object_id, name, stype, domain=u"local", address=None, port=-1, properties=None, source_address=None, - local=False, local_publisher=None): + local_publisher=None): if not bus_name: raise ValueError("DBus bus name must be valid") if not object_id or type(object_id) != type(1): @@ -136,6 +165,12 @@ class Service(object): if domain and domain != "local": raise ValueError("must use the 'local' domain (for now).") + # ID of the D-Bus connection that published this service, if any. + # We only let the local publisher modify the service. + self._local_publisher = local_publisher + + self._avahi_entry_group = None + (actid, real_name) = decompose_service_name(name) self._name = real_name self._full_name = name @@ -143,10 +178,9 @@ class Service(object): self._domain = domain self._port = -1 self.set_port(port) - self._properties = {} - self._internal_set_properties(properties, first_time=True) - self._avahi_entry_group = None - self._local = local + self._properties = None + self._dbus_helper = None + self._internal_set_properties(properties) # Source address is the unicast source IP self._source_address = None @@ -175,10 +209,6 @@ class Service(object): self._owner = None - # ID of the D-Bus connection that published this service, if any. - # We only let the local publisher modify the service. - self._local_publisher = local_publisher - # register ourselves with dbus self._object_id = object_id self._object_path = SERVICE_DBUS_OBJECT_PATH + str(self._object_id) @@ -195,11 +225,10 @@ class Service(object): raise RuntimeError("Can only set a service's owner once") self._owner = owner - def get_local_publisher(self): - return self._local_publisher - def is_local(self): - return self._local + if self._local_publisher is not None: + return True + return False def get_name(self): """Return the service's name, usually that of the @@ -225,12 +254,14 @@ class Service(object): def set_property(self, key, value, sender=None): """Set one service property""" + if not self._local_publisher: + raise ValueError("Service was not not registered by requesting process!") if sender is not None and self._local_publisher != sender: raise ValueError("Service was not not registered by requesting process!") if type(key) != type(u""): raise ValueError("Key must be a unicode string.") - if type(value) != type(u"") or type(value) != type(True): + if type(value) != type(u"") and type(value) != type(True): raise ValueError("Key must be a unicode string or a boolean.") # Ignore setting the key to it's current value @@ -254,15 +285,22 @@ class Service(object): if type(value) == type(True): value = "" self._properties[key] = value - self._dbus_helper.PublishedValueChanged(key) - def set_properties(self, properties, sender=None): + # if the service is locally published already, update the TXT records + if self._local_publisher and self._avahi_entry_group: + self.__internal_update_avahi_properties() + + if self._dbus_helper: + self._dbus_helper.PublishedValueChanged([key]) + + def set_properties(self, properties, sender=None, from_network=False): """Set all service properties in one call""" if sender is not None and self._local_publisher != sender: raise ValueError("Service was not not registered by requesting process!") - self._internal_set_properties(properties) - def _internal_set_properties(self, properties, first_time=False): + self._internal_set_properties(properties, from_network) + + def _internal_set_properties(self, properties, from_network=False): """Set the service's properties from either an Avahi TXT record (a list of lists of integers), or a python dictionary.""" @@ -270,6 +308,11 @@ class Service(object): raise ValueError("Properties must be a dictionary.") self._properties = {} + # Make sure the properties are actually different + diff_keys = _dicts_differ(self._properties, properties) + if len(diff_keys) == 0: + return + # Set key/value pairs on internal property list for key, value in properties.items(): if len(key) == 0: @@ -282,8 +325,19 @@ class Service(object): tmp_val = unicode(tmp_val) self._properties[tmp_key] = tmp_val - if not first_time: - self._dbus_helper.PublishedValueChanged(self._properties.keys()) + # if the service is locally published already, update the TXT records + if self._local_publisher and self._avahi_entry_group and not from_network: + self.__internal_update_avahi_properties() + + if self._dbus_helper: + self._dbus_helper.PublishedValueChanged(diff_keys) + + def __internal_update_avahi_properties(self): + info = _convert_properties_to_dbus_byte_array(self._properties) + self._avahi_entry_group.UpdateServiceTxt(avahi.IF_UNSPEC, + avahi.PROTO_UNSPEC, 0, + dbus.String(self._full_name), dbus.String(self._stype), + dbus.String(self._domain), info) def get_type(self): """Return the service's service type.""" @@ -322,11 +376,42 @@ class Service(object): """Return the ZeroConf/mDNS domain the service was found in.""" return self._domain - def set_avahi_entry_group(self, group): - self._avahi_entry_group = group + def register(self, system_bus, avahi_service): + if self._avahi_entry_group is not None: + raise RuntimeError("Service already registered!") + + obj = system_bus.get_object(avahi.DBUS_NAME, avahi_service.EntryGroupNew()) + self._avahi_entry_group = dbus.Interface(obj, avahi.DBUS_INTERFACE_ENTRY_GROUP) + + info = _convert_properties_to_dbus_byte_array(self._properties) + logging.debug("Will register service with name='%s', stype='%s'," \ + " domain='%s', address='%s', port=%d, info='%s'" % (self._full_name, + self._stype, self._domain, self._address, self._port, info)) + self._avahi_entry_group.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, + dbus.String(self._full_name), dbus.String(self._stype), + dbus.String(self._domain), dbus.String(""), # let Avahi figure the 'host' out + dbus.UInt16(self._port), info) + self._avahi_entry_group.connect_to_signal('StateChanged', self.__entry_group_changed_cb) + self._avahi_entry_group.Commit() + + def __entry_group_changed_cb(self, state, error): + logging.debug("** %s.%s Entry group changed: state %s, error %s" % (self._full_name, self._stype, state, error)) + + def unregister(self, sender): + # Refuse to unregister if we can't get the dbus connection this request + # came from for some reason + if not sender: + raise RuntimeError("Service registration request must have a sender.") + if not self._local_publisher: + raise ValueError("Service was not a local service provided by this laptop!") + if sender is not None and self._local_publisher != sender: + raise ValueError("Service was not registered by requesting process!") + if not self._avahi_entry_group: + raise ValueError("Service was not registered by requesting process!") + self._avahi_entry_group.Free() + del self._avahi_entry_group + self._avahi_entry_group = None - def get_avahi_entry_group(self): - return self._avahi_entry_group ################################################################# # Tests -- cgit v0.9.1