From 5057a0e6530cd5585186659a5ff410ba8954a04d Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Thu, 03 May 2007 14:43:57 +0000 Subject: Add a mock sharing read activity --- (limited to 'tests') diff --git a/tests/presence/mock-share-read.py b/tests/presence/mock-share-read.py new file mode 100755 index 0000000..2c9ad27 --- /dev/null +++ b/tests/presence/mock-share-read.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# Copyright (C) 2007, Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os, time, sys +import dbus, dbus.glib +import gobject + +from sugar.presence import presenceservice +from sugar.p2p import network + +class ReadHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler): + def translate_path(self, path): + return self.server._filepath + +class ReadHTTPServer(network.GlibTCPServer): + def __init__(self, server_address, request_handler, filepath): + self._filepath = filepath + network.GlibTCPServer.__init__(self, server_address, request_handler); + +class XMLRPCResponder(object): + def __init__(self, have_file=False): + self._have_file = have_file + + def _set_have_file(self): + self._have_file = True + + def have_file(self): + return self._have_file + + +class MockReadActivity(gobject.GObject): + __gproperties__ = { + 'title' : (str, None, None, None, gobject.PARAM_READABLE) + } + + def __init__(self, filepath): + self._actid = "ef60b3af42f7b5aa558ef9269e2ed7998798afc0" + self._name = "Test Read Activity" + self._type = "org.laptop.sugar.Xbook" + gobject.GObject.__init__(self) + + self._ps_act = None + self._filepath = os.path.abspath(filepath) + self._file_server = ReadHTTPServer(("", 8867), ReadHTTPRequestHandler, self._filepath) + + self._xmlrpc_server = network.GlibXMLRPCServer(("", 8868)) + responder = XMLRPCResponder(have_file=True) + self._xmlrpc_server.register_instance(responder) + + def _activity_appeared_cb(self, ps, activity): + if activity.props.id != self._actid: + return + self._ps_act = activity + + def share(self): + ps = presenceservice.get_instance() + ps.connect("activity-appeared", self._activity_appeared_cb) + ps.share_activity(self) + return False + + def do_get_property(self, pspec): + if pspec.name == "title": + return self._name + + def get_id(self): + return self._actid + + def get_service_name(self): + return self._type + +def start_ps(): + import commands + (s, o) = commands.getstatusoutput("which sugar-presence-service") + if s != 0: + raise RuntimeError("Failed to find sugar presence service: %s" % o) + argv = [o, "1"] + (pid, stdin, stdout, stderr) = gobject.spawn_async(argv, flags=gobject.SPAWN_LEAVE_DESCRIPTORS_OPEN) + + # Wait until it shows up on the bus + tries = 0 + bus = dbus.SessionBus() + while tries < 10: + time.sleep(0.5) + bus_object = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') + try: + if bus_object.GetNameOwner(presenceservice.DBUS_SERVICE, dbus_interface='org.freedesktop.DBus'): + break + except dbus.exceptions.DBusException, err: + pass + tries += 1 + + if tries >= 5: + stop_ps(pid) + raise RuntimeError("Couldn't start the mock presence service") + + print "Started presence service PID %d" % pid + return pid + + +def stop_ps(pid): + if pid >= 0: + os.kill(pid, 15) + print "Stopped presence service PID %d" % pid + +def main(): + if len(sys.argv) != 2: + raise RuntimeError("Must specify a PDF to share.") + path = os.path.abspath(sys.argv[1]) + if not os.path.exists(path): + raise RuntimeError("File %s doesn't exist." % path) + mact = MockReadActivity(path) + pid = start_ps() + loop = gobject.MainLoop() + gobject.timeout_add(2000, mact.share) + try: + loop.run() + except KeyboardInterrupt: + pass + stop_ps(pid) + +if __name__ == "__main__": + main() -- cgit v0.9.1