Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon McVittie <simon.mcvittie@collabora.co.uk>2007-06-12 00:23:09 (GMT)
committer Simon McVittie <simon.mcvittie@collabora.co.uk>2007-06-12 00:23:09 (GMT)
commit917344772694674a94a40fd1f47be178873c1e4f (patch)
tree8b97e3f746a1435c60b2b1030d9602e509809a28
parentad3ec6d0be088eb3545078548d8b2bb8f1c0070f (diff)
Batch together online/offline events. Implement a queue for initial buddy inspection, to avoid hitting pending call limit; put it in PresenceService so the queue can be shared.
-rw-r--r--src/buddy.py58
-rw-r--r--src/presenceservice.py147
-rw-r--r--src/server_plugin.py93
3 files changed, 168 insertions, 130 deletions
diff --git a/src/buddy.py b/src/buddy.py
index 84f5bea..d0ad63d 100644
--- a/src/buddy.py
+++ b/src/buddy.py
@@ -295,64 +295,6 @@ class Buddy(ExportedGObject):
self._handles[tp_client] = (handle, uid)
self.TelepathyHandleAdded(conn.service_name, conn.object_path, handle)
- # FIXME: we should probably have a class SomeoneElse(Buddy) for
- # everyone who's not the owner
- if not self._owner:
- self._discover_properties(tp_client, handle, uid)
-
- def _discover_properties(self, tp_client, handle, uid):
- conn = tp_client.get_connection()
-
- accumulator = {}
-
- def got_aliases(aliases):
- try:
- _logger.debug('Buddy %s nick set to %s', self._object_id,
- aliases[0])
- accumulator.update({'nick': aliases[0]})
- finally:
- self.set_properties(accumulator)
- def aliases_error(e):
- try:
- _logger.warning('Error getting buddy properties for %s: '
- '%s', uid, e)
- finally:
- self.set_properties(accumulator)
-
- def get_alias():
- accumulator.setdefault('nick', uid)
- if CONN_INTERFACE_ALIASING in conn:
- conn[CONN_INTERFACE_ALIASING].RequestAliases([handle],
- reply_handler=got_aliases,
- error_handler=aliases_error)
- else:
- self.set_properties(accumulator)
-
- def got_properties(props):
- try:
- _logger.debug('Buddy %s properties are %r', self._object_id,
- props)
- accumulator.update(props)
- finally:
- get_alias()
- def properties_error(e):
- try:
- _logger.warning('Error getting buddy properties for %s: '
- '%s', uid, e)
- finally:
- get_alias()
-
- # Kick off the first request, which is for the properties.
- # Chain from there to the aliases request; chain from *there* to
- # setting the accumulated properties.
- accumulator['color'] = 'white'
- if CONN_INTERFACE_BUDDY_INFO in conn:
- conn[CONN_INTERFACE_BUDDY_INFO].GetProperties(handle,
- byte_arrays=True, reply_handler=got_properties,
- error_handler=properties_error)
- else:
- get_alias()
-
@dbus.service.signal(_BUDDY_INTERFACE, signature='sou')
def TelepathyHandleAdded(self, tp_conn_name, tp_conn_path, handle):
"""Another Telepathy handle has become associated with the buddy.
diff --git a/src/presenceservice.py b/src/presenceservice.py
index 5fac8a9..be58407 100644
--- a/src/presenceservice.py
+++ b/src/presenceservice.py
@@ -16,6 +16,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import logging
+from itertools import izip
from weakref import WeakValueDictionary
import dbus
@@ -111,14 +112,16 @@ class PresenceService(ExportedGObject):
self._activities_by_handle[self._server_plugin] = {}
self._server_plugin.connect('status', self._server_status_cb)
- self._server_plugin.connect('contact-online', self._contact_online)
- self._server_plugin.connect('contact-offline', self._contact_offline)
+ self._server_plugin.connect('contacts-online', self._contacts_online)
+ self._server_plugin.connect('contacts-offline', self._contacts_offline)
self._server_plugin.connect('activity-invitation',
self._activity_invitation)
self._server_plugin.connect('private-invitation',
self._private_invitation)
self._server_plugin.start()
+ self._contacts_online_queue = []
+
# Set up the link local connection
self._ll_plugin = LinkLocalPlugin(self._registry, self._owner)
self._handles_buddies[self._ll_plugin] = {}
@@ -259,26 +262,121 @@ class PresenceService(ExportedGObject):
self._buddies[objid] = buddy
return buddy
- def _contact_online(self, tp, objid, handle, identifier):
- _logger.debug('Handle %u, .../%s is now online', handle, objid)
- buddy = self.get_buddy(objid)
+ def _contacts_online(self, tp, objids, handles, identifiers):
+ # we'll iterate over handles many times, so make sure that will
+ # work
+ if not isinstance(handles, (list, tuple)):
+ handles = tuple(handles)
+
+ for objid, handle, identifier in izip(objids, handles, identifiers):
+ _logger.debug('Handle %u, .../%s is now online', handle, objid)
+ buddy = self.get_buddy(objid)
- self._handles_buddies[tp][handle] = buddy
- # store the handle of the buddy for this CM
- buddy.add_telepathy_handle(tp, handle, identifier)
+ self._handles_buddies[tp][handle] = buddy
+ # Store the handle of the buddy for this CM. This doesn't
+ # fetch anything over D-Bus, to avoid reaching the pending-call
+ # limit.
+ buddy.add_telepathy_handle(tp, handle, identifier)
conn = tp.get_connection()
- # Kick off a request for their current activities. This isn't done
- # internally by the Buddy itself, because when we get the activities
- # back, we actually want to feed them to the Activity objects.
+ if not self._contacts_online_queue:
+ gobject.idle_add(self._run_contacts_online_queue)
+
+ def handle_error(e, when):
+ gobject.idle_add(self._run_contacts_online_queue)
+ _logger.warning('Error %s: %s', when, e)
+
+ if CONN_INTERFACE_ALIASING in conn:
+ def got_aliases(aliases):
+ gobject.idle_add(self._run_contacts_online_queue)
+ for contact, alias in izip(handles, aliases):
+ self._buddy_properties_changed(tp, contact,
+ {'nick': alias})
+ def request_aliases():
+ try:
+ conn[CONN_INTERFACE_ALIASING].RequestAliases(handles,
+ reply_handler=got_aliases,
+ error_handler=lambda e:
+ handle_error(e, 'fetching aliases'))
+ except Exception, e:
+ gobject.idle_add(self._run_contacts_online_queue)
+ handle_error(e, 'fetching aliases')
+ self._contacts_online_queue.append(request_aliases)
- def got_activities(activities):
- self._buddy_activities_changed(tp, handle, activities)
- conn[CONN_INTERFACE_BUDDY_INFO].GetActivities(handle,
- reply_handler=got_activities,
- error_handler=lambda e: _logger.warning('%r: Error getting '
- 'activities: %s', buddy, e))
+ for handle in handles:
+ self._queue_contact_online(tp, handle)
+
+ if CONN_INTERFACE_AVATARS in conn:
+ def got_avatar_tokens(tokens):
+ gobject.idle_add(self._run_contacts_online_queue)
+ for contact, token in izip(handles, tokens):
+ self._avatar_updated(tp, contact, token)
+ def get_avatar_tokens():
+ try:
+ conn[CONN_INTERFACE_AVATARS].GetAvatarTokens(handles,
+ reply_handler=got_avatar_tokens,
+ error_handler=lambda e:
+ handle_error(e, 'fetching avatar tokens'))
+ except Exception, e:
+ gobject.idle_add(self._run_contacts_online_queue)
+ handle_error(e, 'fetching avatar tokens')
+ self._contacts_online_queue.append(get_avatar_tokens)
+
+ def _queue_contact_online(self, tp, contact):
+ conn = tp.get_connection()
+
+ if CONN_INTERFACE_BUDDY_INFO in conn:
+ def handle_error(e, when):
+ gobject.idle_add(self._run_contacts_online_queue)
+ _logger.warning('Error %s: %s', when, e)
+ def got_properties(props):
+ gobject.idle_add(self._run_contacts_online_queue)
+ self._buddy_properties_changed(tp, contact, props)
+ def get_properties():
+ try:
+ conn[CONN_INTERFACE_BUDDY_INFO].GetProperties(contact,
+ byte_arrays=True, reply_handler=got_properties,
+ error_handler=lambda e:
+ handle_error(e, 'fetching buddy properties'))
+ except Exception, e:
+ gobject.idle_add(self._run_contacts_online_queue)
+ handle_error(e, 'fetching buddy properties')
+ def get_current_activity():
+ try:
+ conn[CONN_INTERFACE_BUDDY_INFO].GetCurrentActivity(contact,
+ reply_handler=lambda c, room:
+ got_properties({'current-activity': c}),
+ error_handler=lambda e:
+ handle_error(e, 'fetching current activity'))
+ except Exception, e:
+ gobject.idle_add(self._run_contacts_online_queue)
+ handle_error(e, 'fetching current activity')
+ def got_activities(activities):
+ gobject.idle_add(self._run_contacts_online_queue)
+ self._buddy_activities_changed(tp, contact, activities)
+ def get_activities():
+ try:
+ conn[CONN_INTERFACE_BUDDY_INFO].GetActivities(contact,
+ reply_handler=got_activities,
+ error_handler=lambda e:
+ handle_error(e, 'fetching activities'))
+ except Exception, e:
+ gobject.idle_add(self._run_contacts_online_queue)
+ handle_error(e, 'fetching activities')
+
+ self._contacts_online_queue.append(get_properties)
+ self._contacts_online_queue.append(get_current_activity)
+ self._contacts_online_queue.append(get_activities)
+
+ def _run_contacts_online_queue(self):
+ try:
+ callback = self._contacts_online_queue.pop(0)
+ except IndexError:
+ pass
+ else:
+ callback()
+ return False
def _buddy_validity_changed_cb(self, buddy, valid):
if valid:
@@ -296,13 +394,14 @@ class PresenceService(ExportedGObject):
if buddy.props.valid:
self._buddy_validity_changed_cb(buddy, False)
- def _contact_offline(self, tp, handle):
- buddy = self._handles_buddies[tp].pop(handle, None)
- # the handle of the buddy for this CM is not valid anymore
- # (this might trigger _buddy_disappeared_cb if they are not visible
- # via any CM)
- if buddy is not None:
- buddy.remove_telepathy_handle(tp)
+ def _contacts_offline(self, tp, handles):
+ for handle in handles:
+ buddy = self._handles_buddies[tp].pop(handle, None)
+ # the handle of the buddy for this CM is not valid anymore
+ # (this might trigger _buddy_disappeared_cb if they are not
+ # visible via any CM)
+ if buddy is not None:
+ buddy.remove_telepathy_handle(tp)
def _get_next_object_id(self):
"""Increment and return the object ID counter."""
diff --git a/src/server_plugin.py b/src/server_plugin.py
index 0b4cdd2..4d0c579 100644
--- a/src/server_plugin.py
+++ b/src/server_plugin.py
@@ -67,17 +67,16 @@ class ServerPlugin(gobject.GObject):
to implement the PresenceService.
"""
__gsignals__ = {
- 'contact-online':
- # Contact has come online and we've discovered all their buddy
- # properties.
+ 'contacts-online':
+ # Contacts in the subscribe list have come online.
# args:
- # contact identification (based on key ID or JID): str
- # contact handle: int or long
- # contact identifier (JID): str or unicode
- (gobject.SIGNAL_RUN_FIRST, None, [str, object, object]),
- 'contact-offline':
- # Contact has gone offline.
- # args: contact handle
+ # contact identification (based on key ID or JID): list of str
+ # contact handle: list of int or long
+ # contact identifier (JID): list of str or unicode
+ (gobject.SIGNAL_RUN_FIRST, None, [object, object, object]),
+ 'contacts-offline':
+ # Contacts in the subscribe list have gone offline.
+ # args: iterable over contact handles
(gobject.SIGNAL_RUN_FIRST, None, [object]),
'status':
# Connection status changed.
@@ -114,7 +113,8 @@ class ServerPlugin(gobject.GObject):
self._matches = []
self._registry = registry
- self._online_contacts = {} # handle -> jid
+ #: set of contact handles: those for whom we've emitted contacts-online
+ self._online_contacts = set()
self._owner = owner
self.self_handle = None
@@ -308,8 +308,7 @@ class ServerPlugin(gobject.GObject):
# FIXME: do this async?
self.self_handle = self._conn[CONN_INTERFACE].GetSelfHandle()
self.self_identifier = self._conn[CONN_INTERFACE].InspectHandles(
- HANDLE_TYPE_CONTACT, self.self_handle)[0]
- self._online_contacts[self.self_handle] = self.self_identifier
+ HANDLE_TYPE_CONTACT, [self.self_handle])[0]
# request subscriptions from people subscribed to us if we're not
# subscribed to them
@@ -417,21 +416,18 @@ class ServerPlugin(gobject.GObject):
self._conn = None
self._conn_status = CONNECTION_STATUS_DISCONNECTED
- for handle in self._online_contacts.keys():
- self._contact_offline(handle)
- self._online_contacts = {}
+ if self._online_contacts:
+ self._contacts_offline(self._online_contacts)
if self._reconnect_id > 0:
gobject.source_remove(self._reconnect_id)
self._reconnect_id = 0
- def _contact_offline(self, handle):
- """Handle contact going offline (send message, update set)"""
- if not self._online_contacts.has_key(handle):
- return
- if self._online_contacts[handle]:
- self.emit("contact-offline", handle)
- del self._online_contacts[handle]
+ def _contacts_offline(self, handles):
+ """Handle contacts going offline (send message, update set)"""
+ self._online_contacts -= handles
+ _logger.debug('Contacts now offline: %r', handles)
+ self.emit("contacts-offline", handles)
def _contacts_online(self, handles):
"""Handle contacts coming online"""
@@ -449,14 +445,22 @@ class ServerPlugin(gobject.GObject):
# else it's probably a channel-specific handle - can't create a
# Buddy object for those yet
+ if not relevant:
+ return
+
jids = self._conn[CONN_INTERFACE].InspectHandles(
HANDLE_TYPE_CONTACT, relevant)
- objids = self.identify_contacts(None, relevant, jids)
+ handle_to_objid = self.identify_contacts(None, relevant, jids)
+ objids = []
+ for handle in relevant:
+ objids.append(handle_to_objid[handle])
- for handle, jid, objid in izip(relevant, jids, objids):
- self._online_contacts[handle] = jid
- self.emit('contact-online', objid, handle, jid)
+ self._online_contacts |= frozenset(relevant)
+ _logger.debug('Contacts now online:')
+ for handle, objid in izip(relevant, objids):
+ _logger.debug(' %u .../%s', handle, objid)
+ self.emit('contacts-online', objids, relevant, jids)
def _subscribe_members_changed_cb(self, message, added, removed,
local_pending, remote_pending,
@@ -496,31 +500,24 @@ class ServerPlugin(gobject.GObject):
def _presence_update_cb(self, presence):
"""Send update for online/offline status of presence"""
- now_online = []
- now_offline = []
+ now_online = set()
+ now_offline = set(presence.iterkeys())
for handle in presence:
timestamp, statuses = presence[handle]
- online = handle in self._online_contacts
for status, params in statuses.items():
- if not online and status == "offline":
- # weren't online in the first place...
- continue
- jid = self._conn[CONN_INTERFACE].InspectHandles(
- HANDLE_TYPE_CONTACT, [handle])[0]
- olstr = "ONLINE"
- if not online: olstr = "OFFLINE"
- _logger.debug("Handle %s (%s) was %s, status now '%s'.",
- handle, jid, olstr, status)
- if not online and status in ["available", "away", "brb",
- "busy", "dnd", "xa"]:
- now_online.append(handle)
- elif status in ["offline", "invisible"]:
- now_offline.append(handle)
-
- self._contacts_online(now_online)
- for handle in now_offline:
- self._contact_offline(handle)
+ # FIXME: use correct logic involving the GetStatuses method
+ if status in ["available", "away", "brb", "busy", "dnd", "xa"]:
+ now_online.add(handle)
+ now_offline.discard(handle)
+
+ now_online -= self._online_contacts
+ now_offline &= self._online_contacts
+
+ if now_online:
+ self._contacts_online(now_online)
+ if now_offline:
+ self._contacts_offline(now_offline)
def _new_channel_cb(self, object_path, channel_type, handle_type, handle,
suppress_handler):