Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/services
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2007-04-13 17:11:59 (GMT)
committer Dan Williams <dcbw@redhat.com>2007-04-13 17:11:59 (GMT)
commit31000f6c3e73681163d39c3de0b0ff48437aabd1 (patch)
treeb6af0de60db98c3a6c84a2b9b270773cf1ef3a5d /services
parent7b40f9bf60a0529e0fb06f9b6065c8b429009f52 (diff)
Asynchronize activity join/share in the PS
Diffstat (limited to 'services')
-rw-r--r--services/presence/activity.py76
-rw-r--r--services/presence/presenceservice.py27
-rw-r--r--services/presence/server_plugin.py56
3 files changed, 116 insertions, 43 deletions
diff --git a/services/presence/activity.py b/services/presence/activity.py
index 42fed01..1c1e5c8 100644
--- a/services/presence/activity.py
+++ b/services/presence/activity.py
@@ -65,7 +65,7 @@ class Activity(DBusGObject):
# the telepathy client
self._tp = tp
- self._activity_text_channel = None
+ self._text_channel = None
self._valid = False
self._id = None
@@ -161,10 +161,10 @@ class Activity(DBusGObject):
def GetType(self):
return self.props.type
- @dbus.service.method(_ACTIVITY_INTERFACE,
- in_signature="", out_signature="")
- def Join(self):
- self.join()
+ @dbus.service.method(_ACTIVITY_INTERFACE, in_signature="", out_signature="",
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def Join(self, async_cb, async_err_cb):
+ self.join(async_cb, async_err_cb)
@dbus.service.method(_ACTIVITY_INTERFACE,
in_signature="", out_signature="ao")
@@ -208,24 +208,72 @@ class Activity(DBusGObject):
if self.props.valid:
self.BuddyLeft(buddy.object_path())
- def join(self):
- if not self._joined:
- self._activity_text_channel = self._tp.join_activity(self.props.id)
- self._activity_text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed', self._activity_text_channel_closed_cb)
- self._joined = True
+ def _handle_share_join(self, tp, text_channel):
+ if not text_channel:
+ logging.debug("Error sharing: text channel was None, shouldn't happen")
+ raise RuntimeError("Plugin returned invalid text channel")
+
+ self._text_channel = text_channel
+ self._text_channel[CHANNEL_INTERFACE].connect_to_signal('Closed',
+ self._text_channel_closed_cb)
+ self._joined = True
+ return True
+
+ def _shared_cb(self, tp, activity_id, text_channel, exc, userdata):
+ if activity_id != self.props.id:
+ # Not for us
+ return
+
+ (sigid, async_cb, async_err_cb) = userdata
+ self._tp.disconnect(sigid)
+
+ if exc:
+ async_err_cb(exc)
+ else:
+ self._handle_share_join(tp, text_channel)
+ self.send_properties()
+ async_cb(dbus.ObjectPath(self._object_path))
+
+ def _share(self, (async_cb, async_err_cb)):
+ if self._joined:
+ async_err_cb(RuntimeError("Already shared activity %s" % self.props.id))
+ return
+ sigid = self._tp.connect('activity-shared', self._shared_cb)
+ self._tp.share_activity(self.props.id, (sigid, async_cb, async_err_cb))
+
+ def _joined_cb(self, tp, activity_id, text_channel, exc, userdata):
+ if activity_id != self.props.id:
+ # Not for us
+ return
+
+ (sigid, async_cb, async_err_cb) = userdata
+ self._tp.disconnect(sigid)
+
+ if exc:
+ async_err_cb(exc)
+ else:
+ self._handle_share_join(tp, text_channel)
+ async_cb()
+
+ def join(self, async_cb, async_err_cb):
+ if self._joined:
+ async_err_cb(RuntimeError("Already joined activity %s" % self.props.id))
+ return
+ sigid = self._tp.connect('activity-joined', self._joined_cb)
+ self._tp.join_activity(self.props.id, (sigid, async_cb, async_err_cb))
def get_channels(self):
conn = self._tp.get_connection()
# FIXME add tubes and others channels
- return str(conn.service_name), conn.object_path, [self._activity_text_channel.object_path]
+ return str(conn.service_name), conn.object_path, [self._text_channel.object_path]
def leave(self):
if self._joined:
- self._activity_text_channel[CHANNEL_INTERFACE].Close()
+ self._text_channel[CHANNEL_INTERFACE].Close()
- def _activity_text_channel_closed_cb(self):
+ def _text_channel_closed_cb(self):
self._joined = False
- self._activity_text_channel = None
+ self._text_channel = None
def send_properties(self):
props = {}
diff --git a/services/presence/presenceservice.py b/services/presence/presenceservice.py
index cbf2606..b770ffd 100644
--- a/services/presence/presenceservice.py
+++ b/services/presence/presenceservice.py
@@ -84,6 +84,13 @@ class PresenceService(dbus.service.Object):
dbus.service.Object.__init__(self, self._bus_name, _PRESENCE_PATH)
+ def _activity_shared_cb(self, tp, activity, success, exc, async_cb, async_err_cb):
+ if success:
+ async_cb(activity.object_path())
+ else:
+ del self._activities[activity.props.id]
+ async_err_cb(exc)
+
def _server_status_cb(self, plugin, status, reason):
if status == CONNECTION_STATUS_CONNECTED:
pass
@@ -290,32 +297,24 @@ class PresenceService(dbus.service.Object):
else:
return self._owner.get_object_path()
- @dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}", out_signature="o")
- def ShareActivity(self, actid, atype, name, properties):
- activity = self._share_activity(actid, atype, name, properties)
- return activity.object_path()
+ @dbus.service.method(_PRESENCE_INTERFACE, in_signature="sssa{sv}",
+ out_signature="o", async_callbacks=('async_cb', 'async_err_cb'))
+ def ShareActivity(self, actid, atype, name, properties, async_cb, async_err_cb):
+ self._share_activity(actid, atype, name, properties, (async_cb, async_err_cb))
def cleanup(self):
for tp in self._handles_buddies:
tp.cleanup()
- def _share_activity(self, actid, atype, name, properties):
+ def _share_activity(self, actid, atype, name, properties, callbacks):
objid = self._get_next_object_id()
# FIXME check which tp client we should use to share the activity
- import time
- start = time.time()
- logging.debug("Start share of %s (%s)" % (actid, atype))
color = self._owner.props.color
activity = Activity(self._bus_name, objid, self._server_plugin,
id=actid, type=atype, name=name, color=color, local=True)
activity.connect("validity-changed", self._activity_validity_changed_cb)
self._activities[actid] = activity
-
- activity.join()
- activity.send_properties()
- logging.debug("End share of %s (%s). Time: %f" % (actid, atype, (float)(time.time() - start)))
-
- return activity
+ activity._share(callbacks)
def _activity_validity_changed_cb(self, activity, valid):
if valid:
diff --git a/services/presence/server_plugin.py b/services/presence/server_plugin.py
index b7210aa..ea33e70 100644
--- a/services/presence/server_plugin.py
+++ b/services/presence/server_plugin.py
@@ -95,6 +95,12 @@ class ServerPlugin(gobject.GObject):
([gobject.TYPE_PYOBJECT])),
'activity-properties-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])),
+ 'activity-shared': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
+ ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT,
+ gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT])),
+ 'activity-joined': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
+ ([gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT,
+ gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT]))
}
def __init__(self, registry, owner):
@@ -289,26 +295,46 @@ class ServerPlugin(gobject.GObject):
reply_handler=self._set_self_avatar_cb,
error_handler=lambda *args: self._log_error_cb("avatar", *args))
- def join_activity(self, act):
- handle = self._activities.get(act)
+ def _join_activity_create_channel_cb(self, activity_id, signal, handle, userdata, chan_path):
+ channel = Channel(self._conn._dbus_object._named_service, chan_path)
+ self._joined_activities.append((activity_id, handle))
+ self._set_self_activities()
+ self.emit(signal, activity_id, channel, None, userdata)
- if not handle:
- handle = self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [act])[0]
- self._activities[act] = handle
+ def _join_activity_get_channel_cb(self, activity_id, signal, userdata, handles):
+ if not self._activities.has_key(activity_id):
+ self._activities[activity_id] = handles[0]
- if (act, handle) in self._joined_activities:
- logging.debug("Already joined %s" % act)
+ if (activity_id, handles[0]) in self._joined_activities:
+ e = RuntimeError("Already joined activity %s" % activity_id)
+ logging.debug(str(e))
+ self.emit(signal, activity_id, None, e, userdata)
return
- chan_path = self._conn[CONN_INTERFACE].RequestChannel(
- CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM,
- handle, True)
- channel = Channel(self._conn._dbus_object._named_service, chan_path)
+ self._conn[CONN_INTERFACE].RequestChannel(CHANNEL_TYPE_TEXT,
+ CONNECTION_HANDLE_TYPE_ROOM, handles[0], True,
+ reply_handler=lambda *args: self._join_activity_create_channel_cb(activity_id, signal, handle, userdata, *args),
+ error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args))
- self._joined_activities.append((act, handle))
- self._conn[CONN_INTERFACE_BUDDY_INFO].SetActivities(self._joined_activities)
-
- return channel
+ def _join_error_cb(self, activity_id, signal, userdata, err):
+ e = Exception("Error joining/sharing activity %s: %s" % (activity_id, err))
+ logging.debug(str(e))
+ self.emit(signal, activity_id, None, e, userdata)
+
+ def _internal_join_activity(self, activity_id, signal, userdata):
+ handle = self._activities.get(activity_id)
+ if not handle:
+ self._conn[CONN_INTERFACE].RequestHandles(CONNECTION_HANDLE_TYPE_ROOM, [activity_id],
+ reply_handler=lambda *args: self._join_activity_get_channel_cb(activity_id, signal, userdata, *args),
+ error_handler=lambda *args: self._join_error_cb(activity_id, signal, userdata, *args))
+ else:
+ self._join_activity_get_channel_cb(activity_id, userdata, [handle])
+
+ def share_activity(self, activity_id, userdata):
+ self._internal_join_activity(activity_id, "activity-shared", userdata)
+
+ def join_activity(self, activity_id, userdata):
+ self._internal_join_activity(activity_id, "activity-joined", userdata)
def _ignore_success_cb(self):
pass