Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVincent Vinet <vince.vinet@gmail.com>2009-10-29 14:37:57 (GMT)
committer Vincent Vinet <vince.vinet@gmail.com>2009-10-29 14:37:57 (GMT)
commit91757a5b113841e2c1a465c184e111aeac8fd1d6 (patch)
tree89a54d419fbf45b330c17457a49fd344ffaaed0a
parentd5e7b4d380dfa9b979071b862e67d29ec127c542 (diff)
add the probe tests
-rw-r--r--tests/probetests.py472
-rw-r--r--tutorius/TProbe.py21
2 files changed, 465 insertions, 28 deletions
diff --git a/tests/probetests.py b/tests/probetests.py
index a440334..3690a25 100644
--- a/tests/probetests.py
+++ b/tests/probetests.py
@@ -19,45 +19,471 @@ Probe Tests
"""
import unittest
-import os, sys
-import gtk
-import time
+import pickle
from dbus.mainloop.glib import DBusGMainLoop
+from dbus.mainloop import NULL_MAIN_LOOP
import dbus
-from sugar.tutorius.TProbe import TProbe, ProbeProxy
+from sugar.tutorius.TProbe import TProbe, ProbeProxy, ProbeManager
+from sugar.tutorius import addon
+from sugar.tutorius.actions import Action
+from sugar.tutorius.properties import TIntProperty, TStringProperty
-class FakeActivity(object):
- def __init__(self):
- self.top = gtk.Window(type=gtk.WINDOW_TOPLEVEL)
- self.top.set_name("Top")
+#Create a substitute addon create function
+old_addon_create = addon.create
+fake_addon_cache = {}
+def new_addon_create(name, *args, **kwargs):
+ if name in fake_addon_cache:
+ return fake_addon_cache[name](*args, **kwargs)
+ else:
+ return old_addon_create(name, *args, **kwargs)
+
+message_box = None
+event_box = None
+
+class MockAddon(Action):
+ i = TIntProperty(0)
+ s = TStringProperty("test")
+
+ def do(self):
+ global message_box
+ message_box = (self.i, self.s)
+
+ def undo(self):
+ global message_box
+ message_box = None
+
+ def install_handlers(self, callback, **kwargs):
+ global message_box
+ message_box = callback
+
+ def remove_handlers(self):
+ global message_box
+ message_box = None
+
+fake_addon_cache["MockAddon"] = MockAddon
+
+class MockActivity(object):
+ pass
+
+class MockProbeProxy(object):
+ _MockProxyCache = {}
+ def __new__(cls, activityName):
+ #For testing, use only one instance per activityName
+ return cls._MockProxyCache.setdefault(activityName, super(MockProbeProxy, cls).__new__(cls))
+
+ def __init__(self, activityName):
+ """
+ Constructor
+ @param activityName unique activity id. Must be a valid dbus bus name.
+ """
+ self.MockAction = None
+ self.MockActionUpdate = None
+ self.MockEvent = None
+ self.MockCB = None
+ self.MockAlive = True
+ self.MockEventAddr = None
- hbox = gtk.HBox()
- self.top.add(hbox)
- hbox.show()
+ def isAlive(self):
+ return self.MockAlive
+
+ def install(self, action, block=False):
+ self.MockAction = action
+ self.MockActionUpdate = None
+ return None
+
+ def update(self, action, newaction, block=False):
+ self.MockAction = action
+ self.MockActionUpdate = newaction
+ return None
- btn1 = gtk.Button()
- btn1.set_name("Button1")
- hbox.pack_start(btn1)
- btn1.show()
- self.button = btn1
+ def uninstall(self, action, block=False):
+ self.MockAction = None
+ self.MockActionUpdate = None
+ return None
+ def subscribe(self, event, callback, block=True):
+ #Do like the current Probe
+ if not block:
+ raise RuntimeError("This function does not allow non-blocking mode yet")
+
+ self.MockEvent= event
+ self.MockCB = callback
+ return str(id(event))
+
+ def unsubscribe(self, address, block=True):
+ self.MockEventAddr = address
+ return None
+
+ def detach(self, block=False):
+ self.MockAction = None
+ self.MockActionUpdate = None
+ self.MockEvent = None
+ self.MockCB = None
+ self.MockAlive = False
+ self.MockEventAddr = None
+ return None
+
+class MockProxyObject(object):
+ _MockProxyObjects = {}
+ def __new__(cls, name, path):
+ return cls._MockProxyObjects.setdefault((name, path), super(MockProxyObject, cls).__new__(cls))
+
+ def __init__(self, name, path):
+ self.MockCall = {}
+ self.MockRet = {}
+ self.MockCB = {}
+
+ def get_dbus_method(self, name, *args, **kwargs):
+ #FIXME This mockMethod should support asynchronous calling,
+ # and possibly more
+ def mockMethod(*a, **kw):
+ self.MockCall[name] = dict(args=a, kwargs=kw)
+ return self.MockRet.get(name, None)
+ return mockMethod
+
+ def connect_to_signal(self, signal_name, handler_function, dbus_interface=None, **kw):
+ self.MockCB[signal_name] = dict(handler_function=handler_function, dbus_interface=dbus_interface, **kw)
+
+class MockSessionBus(object):
+ def get_object(self, bus_name, object_path, introspect=True, follow_name_owner_changes=False, **kwargs):
+ return MockProxyObject(bus_name, object_path)
+
+old_SessionBus = dbus.SessionBus
+
+###########################################################################
+# Begin Test Cases
+###########################################################################
class ProbeTest(unittest.TestCase):
- def test_ping(self):
+ def setUp(self):
+ global message_box
+ message_box = None
+
+ #Fix the addon create
+ addon.create = new_addon_create
+
+ #Set a default dbus mainloop
m = DBusGMainLoop(set_as_default=True)
dbus.set_default_main_loop(m)
- activity = FakeActivity()
- probe = TProbe("localhost.unittest.ProbeTest", activity.top)
+ #Setup the activity and probe
+ self.activity = MockActivity()
+ self.probe = TProbe("localhost.unittest.ProbeTest", self.activity)
- #Parent, ping the probe
- proxy = ProbeProxy("localhost.unittest.ProbeTest")
- res = probe.ping()
-
+ #Override the eventOccured on the Probe...
+ self.old_eO = self.probe.eventOccured
+ def newEo(event):
+ global event_box
+ try:
+ self.old_eO(event)
+ event_box = event
+ except RuntimeError:
+ event_box = None
+
+ self.probe.eventOccured = newEo
+
+ def tearDown(self):
+ #Replace addon create
+ addon.create = old_addon_create
+
+ #Clear the default dbus mainloop
+ dbus.set_default_main_loop(NULL_MAIN_LOOP)
+
+ #Clear the activity
+ self.probe.remove_from_connection()
+ del self.probe
+ del self.activity
+
+ def test_ping(self):
+ #Test ping()
+ res = self.probe.ping()
assert res == "alive", "Probe should be alive"
+ def test_action(self):
+ global message_box
+ action = MockAddon()
+ action.i, action.s = (5,"woot")
+
+ assert message_box is None, "Message box should still be empty"
+
+ #install 1
+ address = self.probe.install(pickle.dumps(action))
+ assert type(address) == str, "install should return a string"
+ assert message_box == (5, "woot"), "message box should have (i, s)"
+
+ #install 2
+ action.i, action.s = (10, "ahhah!")
+ address2 = self.probe.install(pickle.dumps(action))
+ assert message_box == (10, "ahhah!"), "message box should have changed"
+ assert address != address2, "action addresses should be different"
+
+ #uninstall 2
+ self.probe.uninstall(address2)
+ assert message_box is None, "undo should clear the message box"
+
+ #update action 1 with action 2 props
+ self.probe.update(address, pickle.dumps(action._props))
+ assert message_box == (10, "ahhah!"), "message box should have changed(i, s)"
+
+ #try to update 2, should fail
+ self.assertRaises(KeyError, self.probe.update, address2, pickle.dumps(action._props))
+
+ self.probe.uninstall(address)
+ assert message_box is None, "undo should clear the message box"
+
+ message_box = "Test"
+ #Uninstall twice should do nothing
+ self.probe.uninstall(address)
+ assert message_box == "Test", "undo should not have happened again"
+
+ def test_events(self):
+ global message_box
+ global event_box
+
+ event = MockAddon()
+ event.i, event.s = (0, "event1")
+ event2 = MockAddon()
+ event2.i, event2.s = (1, "event2")
+
+ addr = self.probe.subscribe(pickle.dumps(event))
+ cb1 = message_box
+ addr2 = self.probe.subscribe(pickle.dumps(event2))
+ cb2 = message_box
+ assert type(addr) == str, "should return a string address"
+ assert addr != addr2, "each subscribe should return a different address"
+
+ assert event_box is None, "event_box should still be empty"
+ #Do the callback 2
+ cb2()
+
+ assert event_box is not None, "event_box should have an event"
+
+ assert type(event_box) == str, "event should be pickled"
+ assert pickle.loads(event_box) == event2, "event should be event2"
+
+ #Unsubscribe event 2
+ self.probe.unsubscribe(addr2)
+ assert message_box is None, "unsubscribe should clear the message_box"
+
+ #Do the callback 1
+ cb1()
+ assert pickle.loads(event_box) == event, "event should be event1"
+
+ #unsubscribe event 1
+ self.probe.unsubscribe(addr)
+ assert message_box is None, "unsubscribe should clear the message_box"
+
+ event_box = None
+ #Do the callback 1 again
+ self.assertRaises(RuntimeWarning, cb1)
+
+class ProbeManagerTest(unittest.TestCase):
+ def setUp(self):
+ MockProbeProxy._MockProxyCache = {}
+ self.probeManager = ProbeManager(proxy_class=MockProbeProxy)
+
+ def test_attach(self):
+ #Attempt to set to a non existing activity
+ try:
+ self.probeManager.currentActivity = "act1"
+ assert False, "Exception expected"
+ except RuntimeError, e:
+ pass
+
+ #Attach an activity
+ self.probeManager.attach("act1")
+
+ #Should have been created
+ assert "act1" in MockProbeProxy._MockProxyCache.keys(), "Proxy not created"
+ act1 = MockProbeProxy("act1")
+
+ #Try to attach again
+ self.assertRaises(RuntimeWarning, self.probeManager.attach, "act1")
+
+ #Set current activity should work
+ self.probeManager.currentActivity = "act1"
+
+ #TODO Fill in the alive/notalive behavior at creation time once
+ # it is fixed in the ProbeManager
+
+ def test_detach(self):
+ #attach an activity
+ self.probeManager.attach("act1")
+ self.probeManager.currentActivity = "act1"
+ act1 = MockProbeProxy("act1")
+
+ #Now we detach
+ self.probeManager.detach("act1")
+ assert act1.MockAlive == False, "ProbeProxy should have been detached"
+ assert self.probeManager.currentActivity is None, "Current activity should be None"
+
+ #Attempt to detach again, should do nothing
+ self.probeManager.detach("act1")
+
+ #Now, attach 2 activities
+ self.probeManager.attach("act2")
+ self.probeManager.attach("act3")
+ act2 = MockProbeProxy("act2")
+ act3 = MockProbeProxy("act3")
+
+ self.probeManager.currentActivity = "act2"
+
+ assert act2.MockAlive and act3.MockAlive, "Both ProbeProxy instances should be alive"
+
+ #Detach the not active activity
+ self.probeManager.detach("act3")
+ #Check the statuses
+ assert act2.MockAlive and not act3.MockAlive, "Only act2 should be alive"
+ assert self.probeManager.currentActivity == "act2", "act2 should not have failed"
+
+ def test_actions(self):
+ self.probeManager.attach("act1")
+ self.probeManager.attach("act2")
+ act1 = MockProbeProxy("act1")
+ act2 = MockProbeProxy("act2")
+
+ ad1 = MockAddon()
+ #Action functions should do a warning if there is no activity
+ self.assertRaises(RuntimeWarning, self.probeManager.install, ad1)
+ self.assertRaises(RuntimeWarning, self.probeManager.update, ad1, ad1)
+ self.assertRaises(RuntimeWarning, self.probeManager.uninstall, ad1)
+ assert act1.MockAction is None, "Action should not be installed on inactive proxy"
+ assert act1.MockAction is None, "Action should not be installed on inactive proxy"
+
+ self.probeManager.currentActivity = "act1"
+ self.probeManager.install(ad1)
+ assert act1.MockAction == ad1, "Action should have been installed"
+ assert act2.MockAction is None, "Action should not be installed on inactive proxy"
+
+ self.probeManager.update(ad1, ad1)
+ assert act1.MockActionUpdate == ad1, "Action should have been updated"
+ assert act2.MockActionUpdate is None, "Should not update on inactive"
+
+ self.probeManager.currentActivity = "act2"
+ self.probeManager.uninstall(ad1)
+ assert act1.MockAction == ad1, "Action should still be installed"
+
+ self.probeManager.currentActivity = "act1"
+ self.probeManager.uninstall(ad1)
+ assert act1.MockAction is None, "Action should be uninstalled"
+
+ def test_events(self):
+ self.probeManager.attach("act1")
+ self.probeManager.attach("act2")
+ act1 = MockProbeProxy("act1")
+ act2 = MockProbeProxy("act2")
+
+ ad1 = MockAddon()
+ ad2 = MockAddon()
+ ad2.i, ad2.s = (2, "test2")
+
+ cb1 = lambda *args: None
+ cb2 = lambda *args: None
+
+ #Event functions should do a warning if there is no activity
+ self.assertRaises(RuntimeWarning, self.probeManager.subscribe, ad1, cb1)
+ self.assertRaises(RuntimeWarning, self.probeManager.unsubscribe, None)
+ assert act1.MockEvent is None, "No event should be on act1"
+ assert act2.MockEvent is None, "No event should be on act2"
+
+ self.probeManager.currentActivity = "act1"
+ self.probeManager.subscribe(ad1, cb1)
+ assert act1.MockEvent == ad1, "Event should have been installed"
+ assert act1.MockCB == cb1, "Callback should have been set"
+ assert act2.MockEvent is None, "No event should be on act2"
+
+ self.probeManager.unsubscribe("SomeAddress")
+ assert act1.MockEventAddr == "SomeAddress", "Unsubscrive should have been called"
+ assert act2.MockEventAddr is None, "Unsubscrive should not have been called"
+
+class ProbeProxyTest(unittest.TestCase):
+ def setUp(self):
+ dbus.SessionBus = MockSessionBus
+
+ self.mockObj = MockProxyObject("unittest.TestCase", "/tutorius/Probe")
+ self.probeProxy = ProbeProxy("unittest.TestCase")
+
+ def tearDown(self):
+ dbus.SessionBus = old_SessionBus
+ MockProxyObject._MockProxyObjects = {}
+
+ def test_Alive(self):
+ self.mockObj.MockRet["ping"] = "alive"
+ assert self.probeProxy.isAlive() == True, "Alive should return True"
+
+ self.mockObj.MockRet["ping"] = "anything else"
+ assert self.probeProxy.isAlive() == False, "Alive should return False"
+
+ def test_actions(self):
+ action = MockAddon()
+ action.i, action.s = 5, "action"
+ action2 = MockAddon()
+ action2.i, action2.s = 10, "action2"
+
+ #Check if the installed action is the good one
+ address = "Addr1"
+ #Set the return value of probe install
+ self.mockObj.MockRet["install"] = address
+ self.probeProxy.install(action, block=True)
+ assert pickle.loads(self.mockObj.MockCall["install"]["args"][0]) == action, "1 argument, the action"
+
+ #Update should fail on noninstalled actions
+ self.assertRaises(RuntimeWarning, self.probeProxy.update, action2, action2, block=True)
+
+ #Test the update
+ self.probeProxy.update(action, action2, block=True)
+ args = self.mockObj.MockCall["update"]["args"]
+ assert args[0] == address, "arg 1 should be the action address"
+ assert pickle.loads(args[1]) == action2._props, "arg2 should be the new action properties"
+
+ #Test the uninstall
+ self.probeProxy.uninstall(action2, block=True)
+ assert not "uninstall" in self.mockObj.MockCall, "Uninstall should not be called if action is not installed"
+
+ self.probeProxy.uninstall(action, block=True)
+ assert self.mockObj.MockCall["uninstall"]["args"][0] == address, "1 argument, the action address"
+
+ def test_events(self):
+ event = MockAddon()
+ event.i, event.s = 5, "event"
+ event2 = MockAddon()
+ event2.i, event2.s = 10, "event2"
+
+ def callback(event):
+ global message_box
+ message_box = event
+
+ #Check if the installed event is the good one
+ address = "Addr1"
+ #Set the return value of probe subscribe
+ self.mockObj.MockRet["subscribe"] = address
+ self.probeProxy.subscribe(event, callback, block=True)
+ assert pickle.loads(self.mockObj.MockCall["subscribe"]["args"][0]) == event, "1 argument, the event"
+
+ #Call the callback with the event
+ global message_box
+ self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event))
+ assert message_box == event, "callback should have been called with event"
+ message_box = None
+
+ #Call with a wrong event
+ self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event2))
+ assert message_box is None, "callback should not have been called"
+
+
+ #Test the unsubscribe
+ self.probeProxy.unsubscribe("otheraddress", block=True)
+ assert not "unsubscribe" in self.mockObj.MockCall, "Unsubscribe should not be called if event is not subscribeed"
+
+ self.probeProxy.unsubscribe(address, block=True)
+ assert self.mockObj.MockCall["unsubscribe"]["args"][0] == address, "1 argument, the event address"
+
+ #Test the callback with unregistered event
+ self.mockObj.MockCB["eventOccured"]["handler_function"](pickle.dumps(event))
+ assert message_box is None, "callback should not have been called"
+
if __name__ == "__main__":
unittest.main()
diff --git a/tutorius/TProbe.py b/tutorius/TProbe.py
index 867ef1c..afeba6d 100644
--- a/tutorius/TProbe.py
+++ b/tutorius/TProbe.py
@@ -195,7 +195,11 @@ class TProbe(dbus.service.Object):
# The actual method we will call on the probe to send events
def notify(self, event):
LOGGER.debug("TProbe :: notify event %s", str(event))
- self.eventOccured(pickle.dumps(event))
+ #Check that this event is even allowed
+ if event in self._subscribedEvents.values():
+ self.eventOccured(pickle.dumps(event))
+ else:
+ raise RuntimeWarning("Attempted to raise an unregistered event")
# Return a unique name for this action
def _generate_action_reference(self, action):
@@ -400,8 +404,8 @@ class ProbeProxy:
Detach the ProbeProxy from it's TProbe. All installed actions and
subscribed events should be removed.
"""
- for action in self._actions.keys():
- self.uninstall(action, block)
+ for action_addr in self._actions.keys():
+ self.uninstall(action_addr, block)
for address in self._subscribedEvents.keys():
self.unsubscribe(address, block)
@@ -414,7 +418,12 @@ class ProbeManager(object):
For now, it only handles one at a time, though.
Actually it doesn't do much at all. But it keeps your encapsulation happy
"""
- def __init__(self):
+ def __init__(self, proxy_class=ProbeProxy):
+ """Constructor
+ @param proxy_class Class to use for creating Proxies to activities.
+ The class should support the same interface as ProbeProxy.
+ """
+ self._ProxyClass = proxy_class
self._probes = {}
self._current_activity = None
@@ -431,7 +440,7 @@ class ProbeManager(object):
if activity_id in self._probes:
raise RuntimeWarning("Activity already attached")
- self._probes[activity_id] = ProbeProxy(activity_id)
+ self._probes[activity_id] = self._ProxyClass(activity_id)
#TODO what do we do with this? Raise something?
if self._probes[activity_id].isAlive():
print "Alive!"
@@ -442,6 +451,8 @@ class ProbeManager(object):
if activity_id in self._probes:
probe = self._probes.pop(activity_id)
probe.detach()
+ if self._current_activity == activity_id:
+ self._current_activity = None
def install(self, action, block=False):
"""