Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorerick <erick@sugar-dev-erick.(none)>2009-11-06 15:50:06 (GMT)
committer erick <erick@sugar-dev-erick.(none)>2009-11-06 15:50:06 (GMT)
commitb3039144e050555ad93c1ab18ae228b5b1970fb5 (patch)
tree2273d4e3ea3c65902a220dc96579dc470de01a85
parent4c1713d9d51769a7281048fb3bd99da35a3977fa (diff)
-rw-r--r--tests/probetests.py131
-rw-r--r--tutorius/TProbe.py43
2 files changed, 86 insertions, 88 deletions
diff --git a/tests/probetests.py b/tests/probetests.py
index e1a587b..977eda9 100644
--- a/tests/probetests.py
+++ b/tests/probetests.py
@@ -66,15 +66,20 @@ class MockAddon(Action):
fake_addon_cache["MockAddon"] = MockAddon
class MockActivity(object):
- pass
+ def get_bundle_id(self):
+ return "localhost.unittest.ProbeTest"
+
+ def get_id(self):
+ return "unique_id_1"
+
class MockProbeProxy(object):
_MockProxyCache = {}
- def __new__(cls, activityName):
+ def __new__(cls, activityName, unique_id):
#For testing, use only one instance per activityName
return cls._MockProxyCache.setdefault(activityName, super(MockProbeProxy, cls).__new__(cls))
- def __init__(self, activityName):
+ def __init__(self, activityName, unique_id):
"""
Constructor
@param activityName unique activity id. Must be a valid dbus bus name.
@@ -153,6 +158,12 @@ class MockSessionBus(object):
old_SessionBus = dbus.SessionBus
+class MockServiceProxy(object):
+ def register_probe(self, process_name, unique_id):
+ pass
+ def unregister_probe(self, unique_id):
+ pass
+
###########################################################################
# Begin Test Cases
###########################################################################
@@ -170,7 +181,7 @@ class ProbeTest(unittest.TestCase):
#Setup the activity and probe
self.activity = MockActivity()
- self.probe = TProbe("localhost.unittest.ProbeTest", self.activity)
+ self.probe = TProbe(self.activity, MockServiceProxy())
#Override the eventOccured on the Probe...
self.old_eO = self.probe.eventOccured
@@ -287,67 +298,49 @@ class ProbeManagerTest(unittest.TestCase):
MockProbeProxy._MockProxyCache = {}
self.probeManager = ProbeManager(proxy_class=MockProbeProxy)
- def test_attach(self):
- #ErrorCase: Set currentActivity to unattached activity
- #Attempt to set to a non existing activity
- try:
- self.probeManager.currentActivity = "act1"
- assert False, "Exception expected"
- except RuntimeError, e:
- pass
-
- #Attach an activity
- self.probeManager.attach("act1")
-
- #Should have been created
- assert "act1" in MockProbeProxy._MockProxyCache.keys(), "Proxy not created"
-
- #ErrorCase: Attach multiple times to same activity
- #Try to attach again
- self.assertRaises(RuntimeWarning, self.probeManager.attach, "act1")
-
- #Set current activity should work
- self.probeManager.currentActivity = "act1"
-
- #TODO Fill in the alive/notalive behavior at creation time once
- # it is fixed in the ProbeManager
-
- def test_detach(self):
- #attach an activity
- self.probeManager.attach("act1")
- self.probeManager.currentActivity = "act1"
- act1 = MockProbeProxy("act1")
-
- #Now we detach
- self.probeManager.detach("act1")
- assert act1.MockAlive == False, "ProbeProxy should have been detached"
- assert self.probeManager.currentActivity is None, "Current activity should be None"
-
- #Attempt to detach again, should do nothing
- #ErrorCase: detach already detached (currently silent fail)
- self.probeManager.detach("act1")
-
- #Now, attach 2 activities
- self.probeManager.attach("act2")
- self.probeManager.attach("act3")
- act2 = MockProbeProxy("act2")
- act3 = MockProbeProxy("act3")
-
- self.probeManager.currentActivity = "act2"
-
- assert act2.MockAlive and act3.MockAlive, "Both ProbeProxy instances should be alive"
+ def test_register_probe(self):
+ assert len(self.probeManager.get_registered_probes_list()) == 0
+
+ self.probeManager.register_probe("act1", "unique_id_1")
+ assert len(self.probeManager.get_registered_probes_list()) == 1
+ assert len(self.probeManager.get_registered_probes_list("act1")) == 1
+ assert self.probeManager.get_registered_probes_list()[0][0] == "unique_id_1"
+
+ self.probeManager.register_probe("act2","unique_id_2")
+ assert len(self.probeManager.get_registered_probes_list()) == 2
+ assert len(self.probeManager.get_registered_probes_list("act1")) == 1
+ assert self.probeManager.get_registered_probes_list("act1")[0][0] == "unique_id_1"
+ assert len(self.probeManager.get_registered_probes_list("act2")) == 1
+ assert self.probeManager.get_registered_probes_list("act2")[0][0] == "unique_id_2"
+
+ def test_register_multiple_probes(self):
+ assert len(self.probeManager.get_registered_probes_list()) == 0
+
+ self.probeManager.register_probe("act1", "unique_id_1")
+ self.probeManager.register_probe("act1","unique_id_2")
+ assert len(self.probeManager.get_registered_probes_list()) == 2
+ assert len(self.probeManager.get_registered_probes_list("act1")) == 2
+ assert self.probeManager.get_registered_probes_list("act1")[0][0] == "unique_id_1"
+ assert self.probeManager.get_registered_probes_list("act1")[1][0] == "unique_id_2"
+
+ def test_unregister_probe(self):
+ assert len(self.probeManager.get_registered_probes_list()) == 0
+ self.probeManager.register_probe("act1", "unique_id_1")
+ self.probeManager.register_probe("act1","unique_id_2")
+
+ self.probeManager.unregister_probe("unique_id_1")
+ assert len(self.probeManager.get_registered_probes_list("act1")) == 1
+ assert self.probeManager.get_registered_probes_list("act1")[0][0] == "unique_id_2"
- #Detach the not active activity
- self.probeManager.detach("act3")
- #Check the statuses
- assert act2.MockAlive and not act3.MockAlive, "Only act2 should be alive"
- assert self.probeManager.currentActivity == "act2", "act2 should not have failed"
+ self.probeManager.unregister_probe("unique_id_2")
+ assert len(self.probeManager.get_registered_probes_list("act1")) == 0
+ assert self.probeManager.get_registered_probes_list("act1") == []
def test_actions(self):
- self.probeManager.attach("act1")
- self.probeManager.attach("act2")
- act1 = MockProbeProxy("act1")
- act2 = MockProbeProxy("act2")
+ self.probeManager.register_probe("act1", "unique_id_1")
+ self.probeManager.register_probe("act2", "unique_id_2")
+ act1 = self.probeManager.get_registered_probes_list("act1")[0][1]
+ act2 = self.probeManager.get_registered_probes_list("act2")[0][1]
ad1 = MockAddon()
#ErrorCase: install, update, uninstall without currentActivity
@@ -376,10 +369,10 @@ class ProbeManagerTest(unittest.TestCase):
assert act1.MockAction is None, "Action should be uninstalled"
def test_events(self):
- self.probeManager.attach("act1")
- self.probeManager.attach("act2")
- act1 = MockProbeProxy("act1")
- act2 = MockProbeProxy("act2")
+ self.probeManager.register_probe("act1", "unique_id_1")
+ self.probeManager.register_probe("act2", "unique_id_2")
+ act1 = self.probeManager.get_registered_probes_list("act1")[0][1]
+ act2 = self.probeManager.get_registered_probes_list("act2")[0][1]
ad1 = MockAddon()
ad2 = MockAddon()
@@ -405,16 +398,18 @@ class ProbeManagerTest(unittest.TestCase):
assert act1.MockEventAddr == "SomeAddress", "Unsubscribe should have been called"
assert act2.MockEventAddr is None, "Unsubscribe should not have been called"
+
class ProbeProxyTest(unittest.TestCase):
def setUp(self):
dbus.SessionBus = MockSessionBus
- self.mockObj = MockProxyObject("unittest.TestCase", "/tutorius/Probe")
- self.probeProxy = ProbeProxy("unittest.TestCase")
+ self.mockObj = MockProxyObject("unittest.TestCase", "/tutorius/Probe/unique_id_1")
+ self.probeProxy = ProbeProxy("unittest.TestCase", "unique_id_1")
def tearDown(self):
dbus.SessionBus = old_SessionBus
MockProxyObject._MockProxyObjects = {}
+ # TODO: Clean-Up the dbus session bus ???
def test_Alive(self):
self.mockObj.MockRet["ping"] = "alive"
diff --git a/tutorius/TProbe.py b/tutorius/TProbe.py
index 1bd9935..08c9651 100644
--- a/tutorius/TProbe.py
+++ b/tutorius/TProbe.py
@@ -38,17 +38,23 @@ class TProbe(dbus.service.Object):
a DBUS Interface.
"""
- def __init__(self, activity):
+ def __init__(self, activity, service_proxy=None):
"""
Create and register a TProbe for an activity.
@param activity activity reference, must be a gtk container
+ @param service_proxy
"""
# Moving the ObjectStore assignment here, in the meantime
# the reference to the activity shouldn't be share as a
# global variable but passed by the Probe to the objects
# that requires it
self._activity = activity
+
+ if service_proxy == None:
+ from .service import ServiceProxy
+
+ self._service_proxy = service_proxy or ServiceProxy()
ObjectStore().activity = activity
@@ -70,8 +76,7 @@ class TProbe(dbus.service.Object):
self._subscribedEvents = {}
LOGGER.debug("TProbe :: registering '%s' with unique_id '%s'", self._activity_name, activity.get_id())
- from .service import ServiceProxy
- ServiceProxy().register_probe(self._activity_name, self._unique_id)
+ self._service_proxy.register_probe(self._activity_name, self._unique_id)
@@ -459,23 +464,6 @@ class ProbeManager(object):
return self._current_activity
currentActivity = property(fget=getCurrentActivity, fset=setCurrentActivity)
- def attach(self, activity_id):
- if activity_id in self._probes:
- raise RuntimeWarning("Activity already attached")
-
- self._probes[activity_id] = [self._ProxyClass(activity_id)]
- #TODO what do we do with this? Raise something?
- if self._probes[activity_id].isAlive():
- print "Alive!"
- else:
- print "FAil!"
-
- def detach(self, activity_id):
- if activity_id in self._probes:
- probe = self._probes.pop(activity_id)
- probe.detach()
- if self._current_activity == activity_id:
- self._current_activity = None
def install(self, action, block=False):
"""
@@ -570,6 +558,20 @@ class ProbeManager(object):
if len(proxies) == 0:
self._probes.pop(process_name)
+ def get_registered_probes_list(self, process_name=None):
+ if process_name == None:
+ probe_list = []
+ for probes in self._probes.itervalues():
+ probe_list.extend(probes)
+ return probe_list
+ else:
+ if process_name in self._probes:
+ return self._probes[process_name]
+ else:
+ return []
+
+
+
def _first_proxy(self, process_name):
"""
Returns the oldest probe connected under the process_name
@@ -581,3 +583,4 @@ class ProbeManager(object):
else:
raise RuntimeWarning("No activity attached under '%s'", process_name)
+