Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/tests/presence/mock-share-read.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/presence/mock-share-read.py')
-rwxr-xr-xtests/presence/mock-share-read.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/tests/presence/mock-share-read.py b/tests/presence/mock-share-read.py
new file mode 100755
index 0000000..2c9ad27
--- /dev/null
+++ b/tests/presence/mock-share-read.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# Copyright (C) 2007, Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os, time, sys
+import dbus, dbus.glib
+import gobject
+
+from sugar.presence import presenceservice
+from sugar.p2p import network
+
+class ReadHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler):
+ def translate_path(self, path):
+ return self.server._filepath
+
+class ReadHTTPServer(network.GlibTCPServer):
+ def __init__(self, server_address, request_handler, filepath):
+ self._filepath = filepath
+ network.GlibTCPServer.__init__(self, server_address, request_handler);
+
+class XMLRPCResponder(object):
+ def __init__(self, have_file=False):
+ self._have_file = have_file
+
+ def _set_have_file(self):
+ self._have_file = True
+
+ def have_file(self):
+ return self._have_file
+
+
+class MockReadActivity(gobject.GObject):
+ __gproperties__ = {
+ 'title' : (str, None, None, None, gobject.PARAM_READABLE)
+ }
+
+ def __init__(self, filepath):
+ self._actid = "ef60b3af42f7b5aa558ef9269e2ed7998798afc0"
+ self._name = "Test Read Activity"
+ self._type = "org.laptop.sugar.Xbook"
+ gobject.GObject.__init__(self)
+
+ self._ps_act = None
+ self._filepath = os.path.abspath(filepath)
+ self._file_server = ReadHTTPServer(("", 8867), ReadHTTPRequestHandler, self._filepath)
+
+ self._xmlrpc_server = network.GlibXMLRPCServer(("", 8868))
+ responder = XMLRPCResponder(have_file=True)
+ self._xmlrpc_server.register_instance(responder)
+
+ def _activity_appeared_cb(self, ps, activity):
+ if activity.props.id != self._actid:
+ return
+ self._ps_act = activity
+
+ def share(self):
+ ps = presenceservice.get_instance()
+ ps.connect("activity-appeared", self._activity_appeared_cb)
+ ps.share_activity(self)
+ return False
+
+ def do_get_property(self, pspec):
+ if pspec.name == "title":
+ return self._name
+
+ def get_id(self):
+ return self._actid
+
+ def get_service_name(self):
+ return self._type
+
+def start_ps():
+ import commands
+ (s, o) = commands.getstatusoutput("which sugar-presence-service")
+ if s != 0:
+ raise RuntimeError("Failed to find sugar presence service: %s" % o)
+ argv = [o, "1"]
+ (pid, stdin, stdout, stderr) = gobject.spawn_async(argv, flags=gobject.SPAWN_LEAVE_DESCRIPTORS_OPEN)
+
+ # Wait until it shows up on the bus
+ tries = 0
+ bus = dbus.SessionBus()
+ while tries < 10:
+ time.sleep(0.5)
+ bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
+ try:
+ if bus_object.GetNameOwner(presenceservice.DBUS_SERVICE, dbus_interface='org.freedesktop.DBus'):
+ break
+ except dbus.exceptions.DBusException, err:
+ pass
+ tries += 1
+
+ if tries >= 5:
+ stop_ps(pid)
+ raise RuntimeError("Couldn't start the mock presence service")
+
+ print "Started presence service PID %d" % pid
+ return pid
+
+
+def stop_ps(pid):
+ if pid >= 0:
+ os.kill(pid, 15)
+ print "Stopped presence service PID %d" % pid
+
+def main():
+ if len(sys.argv) != 2:
+ raise RuntimeError("Must specify a PDF to share.")
+ path = os.path.abspath(sys.argv[1])
+ if not os.path.exists(path):
+ raise RuntimeError("File %s doesn't exist." % path)
+ mact = MockReadActivity(path)
+ pid = start_ps()
+ loop = gobject.MainLoop()
+ gobject.timeout_add(2000, mact.share)
+ try:
+ loop.run()
+ except KeyboardInterrupt:
+ pass
+ stop_ps(pid)
+
+if __name__ == "__main__":
+ main()