From 31000f6c3e73681163d39c3de0b0ff48437aabd1 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 13 Apr 2007 17:11:59 +0000 Subject: Asynchronize activity join/share in the PS --- (limited to 'services') diff --git a/services/presence/activity.py b/services/presence/activity.py index 42fed01..1c1e5c8 100644 --- a/services/presence/activity.py +++ b/services/presence/activity.py @@ -65,7 +65,7 @@ class Activity(DBusGObject): # the telepathy client self._tp = tp - self._activity_text_channel = None + self._text_channel = None self._valid = False self._id = None @@ -161,10 +161,10 @@ class Activity(DBusGObject): def GetType(self): return self.props.type - @dbus.service.method(_ACTIVITY_INTERFACE, - in_signature="", out_signature="") - def Join(self): - self.join() + @dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="", + async_callbacks=('async_cb', 'async_err_cb')) + def Join(self, async_cb, async_err_cb): + self.join(async_cb, async_err_cb) @dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="ao") @@ -208,24 +208,72 @@ class Activity(DBusGObject): if self.props.valid: self.BuddyLeft(buddy.object_path()) - def join(self): - if not self._joined: - self._activity_text_channel = self._tp.join_activity(self.props.id) - self._activity_text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed', self._activity_text_channel_closed_cb) - self._joined = True + def _handle_share_join(self, tp, text_channel): + if not text_channel: + logging.debug("Error sharing: text channel was None, shouldn't happen") + raise RuntimeError("Plugin returned invalid text channel") + + self._text_channel = text_channel + self._text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed', + self._text_channel_closed_cb) + self._joined = True + return True + + def _shared_cb(self, tp, activity_id, text_channel, exc, userdata): + if activity_id != self.props.id: + # Not for us + return + + (sigid, async_cb, async_err_cb) = userdata + self._tp.disconnect(sigid) + + if exc: + async_err_cb(exc) + else: + self._handle_share_join(tp, text_channel) + self.send_properties() + async_cb(dbus.ObjectPath(self._object_path)) + + def _share(self, (async_cb, async_err_cb)): + if self._joined: + async_err_cb(RuntimeError("Already shared activity %s" % self.props.id)) + return + sigid = self._tp.connect('activity-shared', self._shared_cb) + self._tp.share_activity(self.props.id, (sigid, async_cb, async_err_cb)) + + def _joined_cb(self, tp, activity_id, text_channel, exc, userdata): + if activity_id != self.props.id: + # Not for us + return + + (sigid, async_cb, async_err_cb) = userdata + self._tp.disconnect(sigid) + + if exc: + async_err_cb(exc) + else: + self._handle_share_join(tp, text_channel) + async_cb() + + def join(self, async_cb, async_err_cb): + if self._joined: + async_err_cb(RuntimeError("Already joined activity %s" % self.props.id)) + return + sigid = self._tp.connect('activity-joined', self._joined_cb) + self._tp.join_activity(self.props.id, (sigid, async_cb, async_err_cb)) def get_channels(self): conn = self._tp.get_connection() # FIXME add tubes and others channels - return str(conn.service_name), conn.object_path, [self._activity_text_channel.object_path] + return str(conn.service_name), conn.object_path, [self._text_channel.object_path] def leave(self): if self._joined: - self._activity_text_channel[CHANNEL_INTERFACE].Close() + self._text_channel[CHANNEL_INTERFACE].Close() - def _activity_text_channel_closed_cb(self): + def _text_channel_closed_cb(self): self._joined = False - self._activity_text_channel = None + self._text_channel = None def send_properties(self): props = {} diff --git a/services/presence/presenceservice.py b/services/presence/presenceservice.py index cbf2606..b770ffd 100644 --- a/services/presence/presenceservice.py +++ b/services/presence/presenceservice.py @@ -84,6 +84,13 @@ class PresenceService(dbus.service.Object): dbus.service.Object.__init__(self, self._bus_name, _PRESENCE_PATH) + def _activity_shared_cb(self, tp, activity, success, exc, async_cb, async_err_cb): + if success: + async_cb(activity.object_path()) + else: + del self._activities[activity.props.id] + async_err_cb(exc) + def _server_status_cb(self, plugin, status, reason): if status == CONNECTION_STATUS_CONNECTED: pass @@ -290,32 +297,24 @@ class PresenceService(dbus.service.Object): else: return self._owner.get_object_path() - @dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}", out_signature="o") - def ShareActivity(self, actid, atype, name, properties): - activity = self._share_activity(actid, atype, name, properties) - return activity.object_path() + @dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}", + out_signature="o", async_callbacks=('async_cb', 'async_err_cb')) + def ShareActivity(self, actid, atype, name, properties, async_cb, async_err_cb): + self._share_activity(actid, atype, name, properties, (async_cb, async_err_cb)) def cleanup(self): for tp in self._handles_buddies: tp.cleanup() - def _share_activity(self, actid, atype, name, properties): + def _share_activity(self, actid, atype, name, properties, callbacks): objid = self._get_next_object_id() # FIXME check which tp client we should use to share the activity - import time - start = time.time() - logging.debug("Start share of %s (%s)" % (actid, atype)) color = self._owner.props.color activity = Activity(self._bus_name, objid, self._server_plugin, id=actid, type=atype, name=name, color=color, local=True) activity.connect("validity-changed", self._activity_validity_changed_cb) self._activities[actid] = activity - - activity.join() - activity.send_properties() - logging.debug("End share of %s (%s). Time: %f" % (actid, atype, (float)(time.time() - start))) - - return activity + activity._share(callbacks) def _activity_validity_changed_cb(self, activity, valid): if valid: diff --git a/services/presence/server_plugin.py b/services/presence/server_plugin.py index b7210aa..ea33e70 100644 --- a/services/presence/server_plugin.py +++ b/services/presence/server_plugin.py @@ -95,6 +95,12 @@ class ServerPlugin(gobject.GObject): ([gobject.TYPE_PYOBJECT])), 'activity-properties-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])), + 'activity-shared': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, + ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, + gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])), + 'activity-joined': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, + ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT, + gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])) } def __init__(self, registry, owner): @@ -289,26 +295,46 @@ class ServerPlugin(gobject.GObject): reply_handler=self._set_self_avatar_cb, error_handler=lambda *args: self._log_error_cb("avatar", *args)) - def join_activity(self, act): - handle = self._activities.get(act) + def _join_activity_create_channel_cb(self, activity_id, signal, handle, userdata, chan_path): + channel = Channel(self._conn._dbus_object._named_service, chan_path) + self._joined_activities.append((activity_id, handle)) + self._set_self_activities() + self.emit(signal, activity_id, channel, None, userdata) - if not handle: - handle = self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [act])[0] - self._activities[act] = handle + def _join_activity_get_channel_cb(self, activity_id, signal, userdata, handles): + if not self._activities.has_key(activity_id): + self._activities[activity_id] = handles[0] - if (act, handle) in self._joined_activities: - logging.debug("Already joined %s" % act) + if (activity_id, handles[0]) in self._joined_activities: + e = RuntimeError("Already joined activity %s" % activity_id) + logging.debug(str(e)) + self.emit(signal, activity_id, None, e, userdata) return - chan_path = self._conn[CONN_INTERFACE].RequestChannel( - CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM, - handle, True) - channel = Channel(self._conn._dbus_object._named_service, chan_path) + self._conn[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_TEXT, + CONNECTION_HANDLE_TYPE_ROOM, handles[0], True, + reply_handler=lambda *args: self._join_activity_create_channel_cb(activity_id, signal, handle, userdata, *args), + error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args)) - self._joined_activities.append((act, handle)) - self._conn[CONN_INTERFACE_BUDDY_INFO].SetActivities(self._joined_activities) - - return channel + def _join_error_cb(self, activity_id, signal, userdata, err): + e = Exception("Error joining/sharing activity %s: %s" % (activity_id, err)) + logging.debug(str(e)) + self.emit(signal, activity_id, None, e, userdata) + + def _internal_join_activity(self, activity_id, signal, userdata): + handle = self._activities.get(activity_id) + if not handle: + self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [activity_id], + reply_handler=lambda *args: self._join_activity_get_channel_cb(activity_id, signal, userdata, *args), + error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args)) + else: + self._join_activity_get_channel_cb(activity_id, userdata, [handle]) + + def share_activity(self, activity_id, userdata): + self._internal_join_activity(activity_id, "activity-shared", userdata) + + def join_activity(self, activity_id, userdata): + self._internal_join_activity(activity_id, "activity-joined", userdata) def _ignore_success_cb(self): pass -- cgit v0.9.1