Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/tests/probetests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/probetests.py')
-rw-r--r--tests/probetests.py84
1 files changed, 53 insertions, 31 deletions
diff --git a/tests/probetests.py b/tests/probetests.py
index 59072e5..37748d8 100644
--- a/tests/probetests.py
+++ b/tests/probetests.py
@@ -85,40 +85,43 @@ class MockProbeProxy(object):
@param activityName unique activity id. Must be a valid dbus bus name.
"""
self.MockAction = None
+ self.MockActionName = None
self.MockActionUpdate = None
self.MockEvent = None
self.MockCB = None
self.MockAlive = True
self.MockEventAddr = None
+ self.MockAddressCallback = None
def isAlive(self):
return self.MockAlive
- def install(self, action, block=False):
+ def install(self, action, action_installed_cb, error_cb):
self.MockAction = action
+ self.MockAddressCallback_install = action_installed_cb
+ self.MockInstallErrorCallback = error_cb
self.MockActionUpdate = None
return None
- def update(self, action, newaction, block=False):
- self.MockAction = action
+ def update(self, action_address, newaction, block=False):
+ self.MockActionAddress = action_address
self.MockActionUpdate = newaction
return None
- def uninstall(self, action, block=False):
+ def uninstall(self, action_address):
self.MockAction = None
self.MockActionUpdate = None
return None
- def subscribe(self, event, callback, block=True):
+ def subscribe(self, event, notif_cb, subscribe_cb, error_cb):
#Do like the current Probe
- if not block:
- raise RuntimeError("This function does not allow non-blocking mode yet")
-
- self.MockEvent= event
- self.MockCB = callback
+ self.MockEvent = event
+ self.MockCB = notif_cb
+ self.MockSubscribeCB = subscribe_cb
+ self.MockSubscriptionErrorCb = error_cb
return str(id(event))
- def unsubscribe(self, address, block=True):
+ def unsubscribe(self, address):
self.MockEventAddr = address
return None
@@ -343,29 +346,34 @@ class ProbeManagerTest(unittest.TestCase):
act2 = self.probeManager.get_registered_probes_list("act2")[0][1]
ad1 = MockAddon()
+ ad1_address = "Address1"
+ def callback(value):
+ pass
+ def error_cb():
+ pass
#ErrorCase: install, update, uninstall without currentActivity
#Action functions should do a warning if there is no activity
- self.assertRaises(RuntimeWarning, self.probeManager.install, ad1)
- self.assertRaises(RuntimeWarning, self.probeManager.update, ad1, ad1)
- self.assertRaises(RuntimeWarning, self.probeManager.uninstall, ad1)
+ self.assertRaises(RuntimeWarning, self.probeManager.install, ad1_address, ad1, callback)
+ self.assertRaises(RuntimeWarning, self.probeManager.update, ad1_address, ad1)
+ self.assertRaises(RuntimeWarning, self.probeManager.uninstall, ad1_address)
assert act1.MockAction is None, "Action should not be installed on inactive proxy"
assert act2.MockAction is None, "Action should not be installed on inactive proxy"
self.probeManager.currentActivity = "act1"
- self.probeManager.install(ad1)
+ self.probeManager.install(ad1, callback, error_cb)
assert act1.MockAction == ad1, "Action should have been installed"
assert act2.MockAction is None, "Action should not be installed on inactive proxy"
- self.probeManager.update(ad1, ad1)
+ self.probeManager.update(ad1_address, ad1)
assert act1.MockActionUpdate == ad1, "Action should have been updated"
assert act2.MockActionUpdate is None, "Should not update on inactive"
self.probeManager.currentActivity = "act2"
- self.probeManager.uninstall(ad1)
- assert act1.MockAction == ad1, "Action should still be installed"
+ self.probeManager.uninstall(ad1_address)
+ assert act1.MockActionAddress == ad1_address, "Action should still be installed"
self.probeManager.currentActivity = "act1"
- self.probeManager.uninstall(ad1)
+ self.probeManager.uninstall(ad1_address)
assert act1.MockAction is None, "Action should be uninstalled"
def test_events(self):
@@ -379,17 +387,19 @@ class ProbeManagerTest(unittest.TestCase):
ad2.i, ad2.s = (2, "test2")
cb1 = lambda *args: None
+ install_cb1 = lambda *args:None
+ error_cb1 = lambda *args:None
cb2 = lambda *args: None
#ErrorCase: unsubscribe and subscribe without current activity
#Event functions should do a warning if there is no activity
- self.assertRaises(RuntimeWarning, self.probeManager.subscribe, ad1, cb1)
+ self.assertRaises(RuntimeWarning, self.probeManager.subscribe, ad1, cb1, install_cb1, error_cb1)
self.assertRaises(RuntimeWarning, self.probeManager.unsubscribe, None)
assert act1.MockEvent is None, "No event should be on act1"
assert act2.MockEvent is None, "No event should be on act2"
self.probeManager.currentActivity = "act1"
- self.probeManager.subscribe(ad1, cb1)
+ self.probeManager.subscribe(ad1, cb1, install_cb1, error_cb1)
assert act1.MockEvent == ad1, "Event should have been installed"
assert act1.MockCB == cb1, "Callback should have been set"
assert act2.MockEvent is None, "No event should be on act2"
@@ -398,7 +408,6 @@ class ProbeManagerTest(unittest.TestCase):
assert act1.MockEventAddr == "SomeAddress", "Unsubscribe should have been called"
assert act2.MockEventAddr is None, "Unsubscribe should not have been called"
-
class ProbeProxyTest(unittest.TestCase):
def setUp(self):
dbus.SessionBus = MockSessionBus
@@ -422,46 +431,59 @@ class ProbeProxyTest(unittest.TestCase):
action.i, action.s = 5, "action"
action2 = MockAddon()
action2.i, action2.s = 10, "action2"
+ action2_address = "Addr2"
#Check if the installed action is the good one
address = "Addr1"
+
+ def action_installed_cb(value):
+ pass
+ def error_cb(value):
+ pass
+
#Set the return value of probe install
self.mockObj.MockRet["install"] = address
- self.probeProxy.install(action, block=True)
+ self.probeProxy.install(action, action_installed_cb, error_cb)
assert pickle.loads(self.mockObj.MockCall["install"]["args"][0]) == action, "1 argument, the action"
+ self.mockObj.MockCall["install"]["kwargs"]["reply_handler"](address)
#ErrorCase: Update should fail on noninstalled actions
- self.assertRaises(RuntimeWarning, self.probeProxy.update, action2, action2, block=True)
+ self.assertRaises(RuntimeWarning, self.probeProxy.update, action2_address, action2)
#Test the update
- self.probeProxy.update(action, action2, block=True)
+ self.probeProxy.update(address, action2)
args = self.mockObj.MockCall["update"]["args"]
assert args[0] == address, "arg 1 should be the action address"
assert pickle.loads(args[1]) == action2._props, "arg2 should be the new action properties"
#ErrorCase: Uninstall on not installed action (silent fail)
#Test the uninstall
- self.probeProxy.uninstall(action2, block=True)
+ self.probeProxy.uninstall(action2_address)
assert not "uninstall" in self.mockObj.MockCall, "Uninstall should not be called if action is not installed"
- self.probeProxy.uninstall(action, block=True)
+ self.probeProxy.uninstall(address)
assert self.mockObj.MockCall["uninstall"]["args"][0] == address, "1 argument, the action address"
def test_events(self):
event = MockAddon()
event.i, event.s = 5, "event"
+ event_address = 'event1'
event2 = MockAddon()
event2.i, event2.s = 10, "event2"
+ event_address2 = 'event2'
def callback(event):
global message_box
message_box = event
+ subs_cb = lambda *args : None
+ error_cb = lambda *args : None
#Check if the installed event is the good one
address = "Addr1"
#Set the return value of probe subscribe
self.mockObj.MockRet["subscribe"] = address
- self.probeProxy.subscribe(event, callback, block=True)
+ self.probeProxy.subscribe(event, callback, subs_cb, error_cb)
+ self.probeProxy._ProbeProxy__update_event(event, callback, subs_cb, event_address)
assert pickle.loads(self.mockObj.MockCall["subscribe"]["args"][0]) == event, "1 argument, the event"
#Call the callback with the event
@@ -478,11 +500,11 @@ class ProbeProxyTest(unittest.TestCase):
#ErrorCase: unsubcribe for non subscribed event
#Test the unsubscribe
- self.probeProxy.unsubscribe("otheraddress", block=True)
+ self.probeProxy.unsubscribe(event_address2)
assert not "unsubscribe" in self.mockObj.MockCall, "Unsubscribe should not be called if event is not subscribeed"
- self.probeProxy.unsubscribe(address, block=True)
- assert self.mockObj.MockCall["unsubscribe"]["args"][0] == address, "1 argument, the event address"
+ self.probeProxy.unsubscribe(event_address)
+ assert self.mockObj.MockCall["unsubscribe"]["args"][0] == event_address, "1 argument, the event address"
#ErrorCase: eventOccured triggered by uninstalled event
#Test the callback with unregistered event