From 917344772694674a94a40fd1f47be178873c1e4f Mon Sep 17 00:00:00 2001 From: Simon McVittie Date: Tue, 12 Jun 2007 00:23:09 +0000 Subject: Batch together online/offline events. Implement a queue for initial buddy inspection, to avoid hitting pending call limit; put it in PresenceService so the queue can be shared. --- diff --git a/src/buddy.py b/src/buddy.py index 84f5bea..d0ad63d 100644 --- a/src/buddy.py +++ b/src/buddy.py @@ -295,64 +295,6 @@ class Buddy(ExportedGObject): self._handles[tp_client] = (handle, uid) self.TelepathyHandleAdded(conn.service_name, conn.object_path, handle) - # FIXME: we should probably have a class SomeoneElse(Buddy) for - # everyone who's not the owner - if not self._owner: - self._discover_properties(tp_client, handle, uid) - - def _discover_properties(self, tp_client, handle, uid): - conn = tp_client.get_connection() - - accumulator = {} - - def got_aliases(aliases): - try: - _logger.debug('Buddy %s nick set to %s', self._object_id, - aliases[0]) - accumulator.update({'nick': aliases[0]}) - finally: - self.set_properties(accumulator) - def aliases_error(e): - try: - _logger.warning('Error getting buddy properties for %s: ' - '%s', uid, e) - finally: - self.set_properties(accumulator) - - def get_alias(): - accumulator.setdefault('nick', uid) - if CONN_INTERFACE_ALIASING in conn: - conn[CONN_INTERFACE_ALIASING].RequestAliases([handle], - reply_handler=got_aliases, - error_handler=aliases_error) - else: - self.set_properties(accumulator) - - def got_properties(props): - try: - _logger.debug('Buddy %s properties are %r', self._object_id, - props) - accumulator.update(props) - finally: - get_alias() - def properties_error(e): - try: - _logger.warning('Error getting buddy properties for %s: ' - '%s', uid, e) - finally: - get_alias() - - # Kick off the first request, which is for the properties. - # Chain from there to the aliases request; chain from *there* to - # setting the accumulated properties. - accumulator['color'] = 'white' - if CONN_INTERFACE_BUDDY_INFO in conn: - conn[CONN_INTERFACE_BUDDY_INFO].GetProperties(handle, - byte_arrays=True, reply_handler=got_properties, - error_handler=properties_error) - else: - get_alias() - @dbus.service.signal(_BUDDY_INTERFACE, signature='sou') def TelepathyHandleAdded(self, tp_conn_name, tp_conn_path, handle): """Another Telepathy handle has become associated with the buddy. diff --git a/src/presenceservice.py b/src/presenceservice.py index 5fac8a9..be58407 100644 --- a/src/presenceservice.py +++ b/src/presenceservice.py @@ -16,6 +16,7 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import logging +from itertools import izip from weakref import WeakValueDictionary import dbus @@ -111,14 +112,16 @@ class PresenceService(ExportedGObject): self._activities_by_handle[self._server_plugin] = {} self._server_plugin.connect('status', self._server_status_cb) - self._server_plugin.connect('contact-online', self._contact_online) - self._server_plugin.connect('contact-offline', self._contact_offline) + self._server_plugin.connect('contacts-online', self._contacts_online) + self._server_plugin.connect('contacts-offline', self._contacts_offline) self._server_plugin.connect('activity-invitation', self._activity_invitation) self._server_plugin.connect('private-invitation', self._private_invitation) self._server_plugin.start() + self._contacts_online_queue = [] + # Set up the link local connection self._ll_plugin = LinkLocalPlugin(self._registry, self._owner) self._handles_buddies[self._ll_plugin] = {} @@ -259,26 +262,121 @@ class PresenceService(ExportedGObject): self._buddies[objid] = buddy return buddy - def _contact_online(self, tp, objid, handle, identifier): - _logger.debug('Handle %u, .../%s is now online', handle, objid) - buddy = self.get_buddy(objid) + def _contacts_online(self, tp, objids, handles, identifiers): + # we'll iterate over handles many times, so make sure that will + # work + if not isinstance(handles, (list, tuple)): + handles = tuple(handles) + + for objid, handle, identifier in izip(objids, handles, identifiers): + _logger.debug('Handle %u, .../%s is now online', handle, objid) + buddy = self.get_buddy(objid) - self._handles_buddies[tp][handle] = buddy - # store the handle of the buddy for this CM - buddy.add_telepathy_handle(tp, handle, identifier) + self._handles_buddies[tp][handle] = buddy + # Store the handle of the buddy for this CM. This doesn't + # fetch anything over D-Bus, to avoid reaching the pending-call + # limit. + buddy.add_telepathy_handle(tp, handle, identifier) conn = tp.get_connection() - # Kick off a request for their current activities. This isn't done - # internally by the Buddy itself, because when we get the activities - # back, we actually want to feed them to the Activity objects. + if not self._contacts_online_queue: + gobject.idle_add(self._run_contacts_online_queue) + + def handle_error(e, when): + gobject.idle_add(self._run_contacts_online_queue) + _logger.warning('Error %s: %s', when, e) + + if CONN_INTERFACE_ALIASING in conn: + def got_aliases(aliases): + gobject.idle_add(self._run_contacts_online_queue) + for contact, alias in izip(handles, aliases): + self._buddy_properties_changed(tp, contact, + {'nick': alias}) + def request_aliases(): + try: + conn[CONN_INTERFACE_ALIASING].RequestAliases(handles, + reply_handler=got_aliases, + error_handler=lambda e: + handle_error(e, 'fetching aliases')) + except Exception, e: + gobject.idle_add(self._run_contacts_online_queue) + handle_error(e, 'fetching aliases') + self._contacts_online_queue.append(request_aliases) - def got_activities(activities): - self._buddy_activities_changed(tp, handle, activities) - conn[CONN_INTERFACE_BUDDY_INFO].GetActivities(handle, - reply_handler=got_activities, - error_handler=lambda e: _logger.warning('%r: Error getting ' - 'activities: %s', buddy, e)) + for handle in handles: + self._queue_contact_online(tp, handle) + + if CONN_INTERFACE_AVATARS in conn: + def got_avatar_tokens(tokens): + gobject.idle_add(self._run_contacts_online_queue) + for contact, token in izip(handles, tokens): + self._avatar_updated(tp, contact, token) + def get_avatar_tokens(): + try: + conn[CONN_INTERFACE_AVATARS].GetAvatarTokens(handles, + reply_handler=got_avatar_tokens, + error_handler=lambda e: + handle_error(e, 'fetching avatar tokens')) + except Exception, e: + gobject.idle_add(self._run_contacts_online_queue) + handle_error(e, 'fetching avatar tokens') + self._contacts_online_queue.append(get_avatar_tokens) + + def _queue_contact_online(self, tp, contact): + conn = tp.get_connection() + + if CONN_INTERFACE_BUDDY_INFO in conn: + def handle_error(e, when): + gobject.idle_add(self._run_contacts_online_queue) + _logger.warning('Error %s: %s', when, e) + def got_properties(props): + gobject.idle_add(self._run_contacts_online_queue) + self._buddy_properties_changed(tp, contact, props) + def get_properties(): + try: + conn[CONN_INTERFACE_BUDDY_INFO].GetProperties(contact, + byte_arrays=True, reply_handler=got_properties, + error_handler=lambda e: + handle_error(e, 'fetching buddy properties')) + except Exception, e: + gobject.idle_add(self._run_contacts_online_queue) + handle_error(e, 'fetching buddy properties') + def get_current_activity(): + try: + conn[CONN_INTERFACE_BUDDY_INFO].GetCurrentActivity(contact, + reply_handler=lambda c, room: + got_properties({'current-activity': c}), + error_handler=lambda e: + handle_error(e, 'fetching current activity')) + except Exception, e: + gobject.idle_add(self._run_contacts_online_queue) + handle_error(e, 'fetching current activity') + def got_activities(activities): + gobject.idle_add(self._run_contacts_online_queue) + self._buddy_activities_changed(tp, contact, activities) + def get_activities(): + try: + conn[CONN_INTERFACE_BUDDY_INFO].GetActivities(contact, + reply_handler=got_activities, + error_handler=lambda e: + handle_error(e, 'fetching activities')) + except Exception, e: + gobject.idle_add(self._run_contacts_online_queue) + handle_error(e, 'fetching activities') + + self._contacts_online_queue.append(get_properties) + self._contacts_online_queue.append(get_current_activity) + self._contacts_online_queue.append(get_activities) + + def _run_contacts_online_queue(self): + try: + callback = self._contacts_online_queue.pop(0) + except IndexError: + pass + else: + callback() + return False def _buddy_validity_changed_cb(self, buddy, valid): if valid: @@ -296,13 +394,14 @@ class PresenceService(ExportedGObject): if buddy.props.valid: self._buddy_validity_changed_cb(buddy, False) - def _contact_offline(self, tp, handle): - buddy = self._handles_buddies[tp].pop(handle, None) - # the handle of the buddy for this CM is not valid anymore - # (this might trigger _buddy_disappeared_cb if they are not visible - # via any CM) - if buddy is not None: - buddy.remove_telepathy_handle(tp) + def _contacts_offline(self, tp, handles): + for handle in handles: + buddy = self._handles_buddies[tp].pop(handle, None) + # the handle of the buddy for this CM is not valid anymore + # (this might trigger _buddy_disappeared_cb if they are not + # visible via any CM) + if buddy is not None: + buddy.remove_telepathy_handle(tp) def _get_next_object_id(self): """Increment and return the object ID counter.""" diff --git a/src/server_plugin.py b/src/server_plugin.py index 0b4cdd2..4d0c579 100644 --- a/src/server_plugin.py +++ b/src/server_plugin.py @@ -67,17 +67,16 @@ class ServerPlugin(gobject.GObject): to implement the PresenceService. """ __gsignals__ = { - 'contact-online': - # Contact has come online and we've discovered all their buddy - # properties. + 'contacts-online': + # Contacts in the subscribe list have come online. # args: - # contact identification (based on key ID or JID): str - # contact handle: int or long - # contact identifier (JID): str or unicode - (gobject.SIGNAL_RUN_FIRST, None, [str, object, object]), - 'contact-offline': - # Contact has gone offline. - # args: contact handle + # contact identification (based on key ID or JID): list of str + # contact handle: list of int or long + # contact identifier (JID): list of str or unicode + (gobject.SIGNAL_RUN_FIRST, None, [object, object, object]), + 'contacts-offline': + # Contacts in the subscribe list have gone offline. + # args: iterable over contact handles (gobject.SIGNAL_RUN_FIRST, None, [object]), 'status': # Connection status changed. @@ -114,7 +113,8 @@ class ServerPlugin(gobject.GObject): self._matches = [] self._registry = registry - self._online_contacts = {} # handle -> jid + #: set of contact handles: those for whom we've emitted contacts-online + self._online_contacts = set() self._owner = owner self.self_handle = None @@ -308,8 +308,7 @@ class ServerPlugin(gobject.GObject): # FIXME: do this async? self.self_handle = self._conn[CONN_INTERFACE].GetSelfHandle() self.self_identifier = self._conn[CONN_INTERFACE].InspectHandles( - HANDLE_TYPE_CONTACT, self.self_handle)[0] - self._online_contacts[self.self_handle] = self.self_identifier + HANDLE_TYPE_CONTACT, [self.self_handle])[0] # request subscriptions from people subscribed to us if we're not # subscribed to them @@ -417,21 +416,18 @@ class ServerPlugin(gobject.GObject): self._conn = None self._conn_status = CONNECTION_STATUS_DISCONNECTED - for handle in self._online_contacts.keys(): - self._contact_offline(handle) - self._online_contacts = {} + if self._online_contacts: + self._contacts_offline(self._online_contacts) if self._reconnect_id > 0: gobject.source_remove(self._reconnect_id) self._reconnect_id = 0 - def _contact_offline(self, handle): - """Handle contact going offline (send message, update set)""" - if not self._online_contacts.has_key(handle): - return - if self._online_contacts[handle]: - self.emit("contact-offline", handle) - del self._online_contacts[handle] + def _contacts_offline(self, handles): + """Handle contacts going offline (send message, update set)""" + self._online_contacts -= handles + _logger.debug('Contacts now offline: %r', handles) + self.emit("contacts-offline", handles) def _contacts_online(self, handles): """Handle contacts coming online""" @@ -449,14 +445,22 @@ class ServerPlugin(gobject.GObject): # else it's probably a channel-specific handle - can't create a # Buddy object for those yet + if not relevant: + return + jids = self._conn[CONN_INTERFACE].InspectHandles( HANDLE_TYPE_CONTACT, relevant) - objids = self.identify_contacts(None, relevant, jids) + handle_to_objid = self.identify_contacts(None, relevant, jids) + objids = [] + for handle in relevant: + objids.append(handle_to_objid[handle]) - for handle, jid, objid in izip(relevant, jids, objids): - self._online_contacts[handle] = jid - self.emit('contact-online', objid, handle, jid) + self._online_contacts |= frozenset(relevant) + _logger.debug('Contacts now online:') + for handle, objid in izip(relevant, objids): + _logger.debug(' %u .../%s', handle, objid) + self.emit('contacts-online', objids, relevant, jids) def _subscribe_members_changed_cb(self, message, added, removed, local_pending, remote_pending, @@ -496,31 +500,24 @@ class ServerPlugin(gobject.GObject): def _presence_update_cb(self, presence): """Send update for online/offline status of presence""" - now_online = [] - now_offline = [] + now_online = set() + now_offline = set(presence.iterkeys()) for handle in presence: timestamp, statuses = presence[handle] - online = handle in self._online_contacts for status, params in statuses.items(): - if not online and status == "offline": - # weren't online in the first place... - continue - jid = self._conn[CONN_INTERFACE].InspectHandles( - HANDLE_TYPE_CONTACT, [handle])[0] - olstr = "ONLINE" - if not online: olstr = "OFFLINE" - _logger.debug("Handle %s (%s) was %s, status now '%s'.", - handle, jid, olstr, status) - if not online and status in ["available", "away", "brb", - "busy", "dnd", "xa"]: - now_online.append(handle) - elif status in ["offline", "invisible"]: - now_offline.append(handle) - - self._contacts_online(now_online) - for handle in now_offline: - self._contact_offline(handle) + # FIXME: use correct logic involving the GetStatuses method + if status in ["available", "away", "brb", "busy", "dnd", "xa"]: + now_online.add(handle) + now_offline.discard(handle) + + now_online -= self._online_contacts + now_offline &= self._online_contacts + + if now_online: + self._contacts_online(now_online) + if now_offline: + self._contacts_offline(now_offline) def _new_channel_cb(self, object_path, channel_type, handle_type, handle, suppress_handler): -- cgit v0.9.1