From 91757a5b113841e2c1a465c184e111aeac8fd1d6 Mon Sep 17 00:00:00 2001 From: Vincent Vinet Date: Thu, 29 Oct 2009 14:37:57 +0000 Subject: add the probe tests --- diff --git a/tests/probetests.py b/tests/probetests.py index a440334..3690a25 100644 --- a/tests/probetests.py +++ b/tests/probetests.py @@ -19,45 +19,471 @@ Probe Tests """ import unittest -import os, sys -import gtk -import time +import pickle from dbus.mainloop.glib import DBusGMainLoop +from dbus.mainloop import NULL_MAIN_LOOP import dbus -from sugar.tutorius.TProbe import TProbe, ProbeProxy +from sugar.tutorius.TProbe import TProbe, ProbeProxy, ProbeManager +from sugar.tutorius import addon +from sugar.tutorius.actions import Action +from sugar.tutorius.properties import TIntProperty, TStringProperty -class FakeActivity(object): - def __init__(self): - self.top = gtk.Window(type=gtk.WINDOW_TOPLEVEL) - self.top.set_name("Top") +#Create a substitute addon create function +old_addon_create = addon.create +fake_addon_cache = {} +def new_addon_create(name, *args, **kwargs): + if name in fake_addon_cache: + return fake_addon_cache[name](*args, **kwargs) + else: + return old_addon_create(name, *args, **kwargs) + +message_box = None +event_box = None + +class MockAddon(Action): + i = TIntProperty(0) + s = TStringProperty("test") + + def do(self): + global message_box + message_box = (self.i, self.s) + + def undo(self): + global message_box + message_box = None + + def install_handlers(self, callback, **kwargs): + global message_box + message_box = callback + + def remove_handlers(self): + global message_box + message_box = None + +fake_addon_cache["MockAddon"] = MockAddon + +class MockActivity(object): + pass + +class MockProbeProxy(object): + _MockProxyCache = {} + def __new__(cls, activityName): + #For testing, use only one instance per activityName + return cls._MockProxyCache.setdefault(activityName, super(MockProbeProxy, cls).__new__(cls)) + + def __init__(self, activityName): + """ + Constructor + @param activityName unique activity id. Must be a valid dbus bus name. + """ + self.MockAction = None + self.MockActionUpdate = None + self.MockEvent = None + self.MockCB = None + self.MockAlive = True + self.MockEventAddr = None - hbox = gtk.HBox() - self.top.add(hbox) - hbox.show() + def isAlive(self): + return self.MockAlive + + def install(self, action, block=False): + self.MockAction = action + self.MockActionUpdate = None + return None + + def update(self, action, newaction, block=False): + self.MockAction = action + self.MockActionUpdate = newaction + return None - btn1 = gtk.Button() - btn1.set_name("Button1") - hbox.pack_start(btn1) - btn1.show() - self.button = btn1 + def uninstall(self, action, block=False): + self.MockAction = None + self.MockActionUpdate = None + return None + def subscribe(self, event, callback, block=True): + #Do like the current Probe + if not block: + raise RuntimeError("This function does not allow non-blocking mode yet") + + self.MockEvent= event + self.MockCB = callback + return str(id(event)) + + def unsubscribe(self, address, block=True): + self.MockEventAddr = address + return None + + def detach(self, block=False): + self.MockAction = None + self.MockActionUpdate = None + self.MockEvent = None + self.MockCB = None + self.MockAlive = False + self.MockEventAddr = None + return None + +class MockProxyObject(object): + _MockProxyObjects = {} + def __new__(cls, name, path): + return cls._MockProxyObjects.setdefault((name, path), super(MockProxyObject, cls).__new__(cls)) + + def __init__(self, name, path): + self.MockCall = {} + self.MockRet = {} + self.MockCB = {} + + def get_dbus_method(self, name, *args, **kwargs): + #FIXME This mockMethod should support asynchronous calling, + # and possibly more + def mockMethod(*a, **kw): + self.MockCall[name] = dict(args=a, kwargs=kw) + return self.MockRet.get(name, None) + return mockMethod + + def connect_to_signal(self, signal_name, handler_function, dbus_interface=None, **kw): + self.MockCB[signal_name] = dict(handler_function=handler_function, dbus_interface=dbus_interface, **kw) + +class MockSessionBus(object): + def get_object(self, bus_name, object_path, introspect=True, follow_name_owner_changes=False, **kwargs): + return MockProxyObject(bus_name, object_path) + +old_SessionBus = dbus.SessionBus + +########################################################################### +# Begin Test Cases +########################################################################### class ProbeTest(unittest.TestCase): - def test_ping(self): + def setUp(self): + global message_box + message_box = None + + #Fix the addon create + addon.create = new_addon_create + + #Set a default dbus mainloop m = DBusGMainLoop(set_as_default=True) dbus.set_default_main_loop(m) - activity = FakeActivity() - probe = TProbe("localhost.unittest.ProbeTest", activity.top) + #Setup the activity and probe + self.activity = MockActivity() + self.probe = TProbe("localhost.unittest.ProbeTest", self.activity) - #Parent, ping the probe - proxy = ProbeProxy("localhost.unittest.ProbeTest") - res = probe.ping() - + #Override the eventOccured on the Probe... + self.old_eO = self.probe.eventOccured + def newEo(event): + global event_box + try: + self.old_eO(event) + event_box = event + except RuntimeError: + event_box = None + + self.probe.eventOccured = newEo + + def tearDown(self): + #Replace addon create + addon.create = old_addon_create + + #Clear the default dbus mainloop + dbus.set_default_main_loop(NULL_MAIN_LOOP) + + #Clear the activity + self.probe.remove_from_connection() + del self.probe + del self.activity + + def test_ping(self): + #Test ping() + res = self.probe.ping() assert res == "alive", "Probe should be alive" + def test_action(self): + global message_box + action = MockAddon() + action.i, action.s = (5,"woot") + + assert message_box is None, "Message box should still be empty" + + #install 1 + address = self.probe.install(pickle.dumps(action)) + assert type(address) == str, "install should return a string" + assert message_box == (5, "woot"), "message box should have (i, s)" + + #install 2 + action.i, action.s = (10, "ahhah!") + address2 = self.probe.install(pickle.dumps(action)) + assert message_box == (10, "ahhah!"), "message box should have changed" + assert address != address2, "action addresses should be different" + + #uninstall 2 + self.probe.uninstall(address2) + assert message_box is None, "undo should clear the message box" + + #update action 1 with action 2 props + self.probe.update(address, pickle.dumps(action._props)) + assert message_box == (10, "ahhah!"), "message box should have changed(i, s)" + + #try to update 2, should fail + self.assertRaises(KeyError, self.probe.update, address2, pickle.dumps(action._props)) + + self.probe.uninstall(address) + assert message_box is None, "undo should clear the message box" + + message_box = "Test" + #Uninstall twice should do nothing + self.probe.uninstall(address) + assert message_box == "Test", "undo should not have happened again" + + def test_events(self): + global message_box + global event_box + + event = MockAddon() + event.i, event.s = (0, "event1") + event2 = MockAddon() + event2.i, event2.s = (1, "event2") + + addr = self.probe.subscribe(pickle.dumps(event)) + cb1 = message_box + addr2 = self.probe.subscribe(pickle.dumps(event2)) + cb2 = message_box + assert type(addr) == str, "should return a string address" + assert addr != addr2, "each subscribe should return a different address" + + assert event_box is None, "event_box should still be empty" + #Do the callback 2 + cb2() + + assert event_box is not None, "event_box should have an event" + + assert type(event_box) == str, "event should be pickled" + assert pickle.loads(event_box) == event2, "event should be event2" + + #Unsubscribe event 2 + self.probe.unsubscribe(addr2) + assert message_box is None, "unsubscribe should clear the message_box" + + #Do the callback 1 + cb1() + assert pickle.loads(event_box) == event, "event should be event1" + + #unsubscribe event 1 + self.probe.unsubscribe(addr) + assert message_box is None, "unsubscribe should clear the message_box" + + event_box = None + #Do the callback 1 again + self.assertRaises(RuntimeWarning, cb1) + +class ProbeManagerTest(unittest.TestCase): + def setUp(self): + MockProbeProxy._MockProxyCache = {} + self.probeManager = ProbeManager(proxy_class=MockProbeProxy) + + def test_attach(self): + #Attempt to set to a non existing activity + try: + self.probeManager.currentActivity = "act1" + assert False, "Exception expected" + except RuntimeError, e: + pass + + #Attach an activity + self.probeManager.attach("act1") + + #Should have been created + assert "act1" in MockProbeProxy._MockProxyCache.keys(), "Proxy not created" + act1 = MockProbeProxy("act1") + + #Try to attach again + self.assertRaises(RuntimeWarning, self.probeManager.attach, "act1") + + #Set current activity should work + self.probeManager.currentActivity = "act1" + + #TODO Fill in the alive/notalive behavior at creation time once + # it is fixed in the ProbeManager + + def test_detach(self): + #attach an activity + self.probeManager.attach("act1") + self.probeManager.currentActivity = "act1" + act1 = MockProbeProxy("act1") + + #Now we detach + self.probeManager.detach("act1") + assert act1.MockAlive == False, "ProbeProxy should have been detached" + assert self.probeManager.currentActivity is None, "Current activity should be None" + + #Attempt to detach again, should do nothing + self.probeManager.detach("act1") + + #Now, attach 2 activities + self.probeManager.attach("act2") + self.probeManager.attach("act3") + act2 = MockProbeProxy("act2") + act3 = MockProbeProxy("act3") + + self.probeManager.currentActivity = "act2" + + assert act2.MockAlive and act3.MockAlive, "Both ProbeProxy instances should be alive" + + #Detach the not active activity + self.probeManager.detach("act3") + #Check the statuses + assert act2.MockAlive and not act3.MockAlive, "Only act2 should be alive" + assert self.probeManager.currentActivity == "act2", "act2 should not have failed" + + def test_actions(self): + self.probeManager.attach("act1") + self.probeManager.attach("act2") + act1 = MockProbeProxy("act1") + act2 = MockProbeProxy("act2") + + ad1 = MockAddon() + #Action functions should do a warning if there is no activity + self.assertRaises(RuntimeWarning, self.probeManager.install, ad1) + self.assertRaises(RuntimeWarning, self.probeManager.update, ad1, ad1) + self.assertRaises(RuntimeWarning, self.probeManager.uninstall, ad1) + assert act1.MockAction is None, "Action should not be installed on inactive proxy" + assert act1.MockAction is None, "Action should not be installed on inactive proxy" + + self.probeManager.currentActivity = "act1" + self.probeManager.install(ad1) + assert act1.MockAction == ad1, "Action should have been installed" + assert act2.MockAction is None, "Action should not be installed on inactive proxy" + + self.probeManager.update(ad1, ad1) + assert act1.MockActionUpdate == ad1, "Action should have been updated" + assert act2.MockActionUpdate is None, "Should not update on inactive" + + self.probeManager.currentActivity = "act2" + self.probeManager.uninstall(ad1) + assert act1.MockAction == ad1, "Action should still be installed" + + self.probeManager.currentActivity = "act1" + self.probeManager.uninstall(ad1) + assert act1.MockAction is None, "Action should be uninstalled" + + def test_events(self): + self.probeManager.attach("act1") + self.probeManager.attach("act2") + act1 = MockProbeProxy("act1") + act2 = MockProbeProxy("act2") + + ad1 = MockAddon() + ad2 = MockAddon() + ad2.i, ad2.s = (2, "test2") + + cb1 = lambda *args: None + cb2 = lambda *args: None + + #Event functions should do a warning if there is no activity + self.assertRaises(RuntimeWarning, self.probeManager.subscribe, ad1, cb1) + self.assertRaises(RuntimeWarning, self.probeManager.unsubscribe, None) + assert act1.MockEvent is None, "No event should be on act1" + assert act2.MockEvent is None, "No event should be on act2" + + self.probeManager.currentActivity = "act1" + self.probeManager.subscribe(ad1, cb1) + assert act1.MockEvent == ad1, "Event should have been installed" + assert act1.MockCB == cb1, "Callback should have been set" + assert act2.MockEvent is None, "No event should be on act2" + + self.probeManager.unsubscribe("SomeAddress") + assert act1.MockEventAddr == "SomeAddress", "Unsubscrive should have been called" + assert act2.MockEventAddr is None, "Unsubscrive should not have been called" + +class ProbeProxyTest(unittest.TestCase): + def setUp(self): + dbus.SessionBus = MockSessionBus + + self.mockObj = MockProxyObject("unittest.TestCase", "/tutorius/Probe") + self.probeProxy = ProbeProxy("unittest.TestCase") + + def tearDown(self): + dbus.SessionBus = old_SessionBus + MockProxyObject._MockProxyObjects = {} + + def test_Alive(self): + self.mockObj.MockRet["ping"] = "alive" + assert self.probeProxy.isAlive() == True, "Alive should return True" + + self.mockObj.MockRet["ping"] = "anything else" + assert self.probeProxy.isAlive() == False, "Alive should return False" + + def test_actions(self): + action = MockAddon() + action.i, action.s = 5, "action" + action2 = MockAddon() + action2.i, action2.s = 10, "action2" + + #Check if the installed action is the good one + address = "Addr1" + #Set the return value of probe install + self.mockObj.MockRet["install"] = address + self.probeProxy.install(action, block=True) + assert pickle.loads(self.mockObj.MockCall["install"]["args"][0]) == action, "1 argument, the action" + + #Update should fail on noninstalled actions + self.assertRaises(RuntimeWarning, self.probeProxy.update, action2, action2, block=True) + + #Test the update + self.probeProxy.update(action, action2, block=True) + args = self.mockObj.MockCall["update"]["args"] + assert args[0] == address, "arg 1 should be the action address" + assert pickle.loads(args[1]) == action2._props, "arg2 should be the new action properties" + + #Test the uninstall + self.probeProxy.uninstall(action2, block=True) + assert not "uninstall" in self.mockObj.MockCall, "Uninstall should not be called if action is not installed" + + self.probeProxy.uninstall(action, block=True) + assert self.mockObj.MockCall["uninstall"]["args"][0] == address, "1 argument, the action address" + + def test_events(self): + event = MockAddon() + event.i, event.s = 5, "event" + event2 = MockAddon() + event2.i, event2.s = 10, "event2" + + def callback(event): + global message_box + message_box = event + + #Check if the installed event is the good one + address = "Addr1" + #Set the return value of probe subscribe + self.mockObj.MockRet["subscribe"] = address + self.probeProxy.subscribe(event, callback, block=True) + assert pickle.loads(self.mockObj.MockCall["subscribe"]["args"][0]) == event, "1 argument, the event" + + #Call the callback with the event + global message_box + self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event)) + assert message_box == event, "callback should have been called with event" + message_box = None + + #Call with a wrong event + self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event2)) + assert message_box is None, "callback should not have been called" + + + #Test the unsubscribe + self.probeProxy.unsubscribe("otheraddress", block=True) + assert not "unsubscribe" in self.mockObj.MockCall, "Unsubscribe should not be called if event is not subscribeed" + + self.probeProxy.unsubscribe(address, block=True) + assert self.mockObj.MockCall["unsubscribe"]["args"][0] == address, "1 argument, the event address" + + #Test the callback with unregistered event + self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event)) + assert message_box is None, "callback should not have been called" + if __name__ == "__main__": unittest.main() diff --git a/tutorius/TProbe.py b/tutorius/TProbe.py index 867ef1c..afeba6d 100644 --- a/tutorius/TProbe.py +++ b/tutorius/TProbe.py @@ -195,7 +195,11 @@ class TProbe(dbus.service.Object): # The actual method we will call on the probe to send events def notify(self, event): LOGGER.debug("TProbe :: notify event %s", str(event)) - self.eventOccured(pickle.dumps(event)) + #Check that this event is even allowed + if event in self._subscribedEvents.values(): + self.eventOccured(pickle.dumps(event)) + else: + raise RuntimeWarning("Attempted to raise an unregistered event") # Return a unique name for this action def _generate_action_reference(self, action): @@ -400,8 +404,8 @@ class ProbeProxy: Detach the ProbeProxy from it's TProbe. All installed actions and subscribed events should be removed. """ - for action in self._actions.keys(): - self.uninstall(action, block) + for action_addr in self._actions.keys(): + self.uninstall(action_addr, block) for address in self._subscribedEvents.keys(): self.unsubscribe(address, block) @@ -414,7 +418,12 @@ class ProbeManager(object): For now, it only handles one at a time, though. Actually it doesn't do much at all. But it keeps your encapsulation happy """ - def __init__(self): + def __init__(self, proxy_class=ProbeProxy): + """Constructor + @param proxy_class Class to use for creating Proxies to activities. + The class should support the same interface as ProbeProxy. + """ + self._ProxyClass = proxy_class self._probes = {} self._current_activity = None @@ -431,7 +440,7 @@ class ProbeManager(object): if activity_id in self._probes: raise RuntimeWarning("Activity already attached") - self._probes[activity_id] = ProbeProxy(activity_id) + self._probes[activity_id] = self._ProxyClass(activity_id) #TODO what do we do with this? Raise something? if self._probes[activity_id].isAlive(): print "Alive!" @@ -442,6 +451,8 @@ class ProbeManager(object): if activity_id in self._probes: probe = self._probes.pop(activity_id) probe.detach() + if self._current_activity == activity_id: + self._current_activity = None def install(self, action, block=False): """ -- cgit v0.9.1