From 912528253fcf1fc43c1a2d02ffe6e540fe60d8e7 Mon Sep 17 00:00:00 2001 From: Vincent Vinet Date: Mon, 19 Oct 2009 20:15:41 +0000 Subject: Merge the TProbe Integration and fix merging induced bugs --- (limited to 'tutorius/TProbe.py') diff --git a/tutorius/TProbe.py b/tutorius/TProbe.py new file mode 100644 index 0000000..6d7b6e2 --- /dev/null +++ b/tutorius/TProbe.py @@ -0,0 +1,506 @@ +import logging +LOGGER = logging.getLogger("sugar.tutorius.TProbe") +import os + +import gobject + +import dbus +import dbus.service +import cPickle as pickle + +import sugar.tutorius.addon as addon + +from sugar.tutorius.services import ObjectStore +from sugar.tutorius.properties import TPropContainer + +from sugar.tutorius.dbustools import remote_call, save_args +import copy + +""" + -------------------- + | ProbeManager | + -------------------- + | + V + -------------------- ---------- + | ProbeProxy |<---- DBus ---->| TProbe | + -------------------- ---------- + +""" +#TODO Add stub error handling for remote calls in the classes so that it will +# be clearer how errors can be handled in the future. + + +class TProbe(dbus.service.Object): + """ Tutorius Probe + Defines an entry point for Tutorius into activities that allows + performing actions and registering events onto an activity via + a DBUS Interface. + """ + + def __init__(self, activity_name, activity): + """ + Create and register a TProbe for an activity. + + @param activity_name unique activity_id + @param activity activity reference, must be a gtk container + """ + LOGGER.debug("TProbe :: Creating TProbe for %s (%d)", activity_name, os.getpid()) + LOGGER.debug("TProbe :: Current gobject context: %s", str(gobject.main_context_default())) + LOGGER.debug("TProbe :: Current gobject depth: %s", str(gobject.main_depth())) + # Moving the ObjectStore assignment here, in the meantime + # the reference to the activity shouldn't be share as a + # global variable but passed by the Probe to the objects + # that requires it + self._activity = activity + + ObjectStore().activity = activity + + self._activity_name = activity_name + self._session_bus = dbus.SessionBus() + + # Giving a new name because _name is already used by dbus + self._name2 = dbus.service.BusName(activity_name, self._session_bus) + dbus.service.Object.__init__(self, self._session_bus, "/tutorius/Probe") + + # Add the dictionary we will use to store which actions and events + # are known + self._installedActions = {} + self._subscribedEvents = {} + + def start(self): + """ + Optional method to call if the probe is not inserted into an + existing activity. Starts a gobject mainloop + """ + mainloop = gobject.MainLoop() + print "Starting Probe for " + self._activity_name + mainloop.run() + + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='s', out_signature='') + def registered(self, service): + print ("Registered with: " + str(service)) + + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='', out_signature='s') + def ping(self): + """ + Allows testing the connection to a Probe + @return string "alive" + """ + return "alive" + + # ------------------ Action handling -------------------------------------- + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='s', out_signature='s') + def install(self, pickled_action): + """ + Install an action on the Activity + @param pickled_action string pickled action + @return string address of installed action + """ + loaded_action = pickle.loads(str(pickled_action)) + action = addon.create(loaded_action.__class__.__name__) + + address = self._generate_action_reference(action) + + self._installedActions[address] = action + + if action._props: + action._props.update(loaded_action._props) + + action.do() + + return address + + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='ss', out_signature='') + def update(self, address, action_props): + """ + Update an already registered action + @param address string address returned by install() + @param action_props pickled action properties + @return None + """ + action = self._installedActions[address] + + if action._props: + props = pickle.loads(str(action_props)) + action._props.update(props) + action.undo() + action.do() + + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='s', out_signature='') + def uninstall(self, address): + """ + Uninstall an action + @param address string address returned by install() + @return None + """ + if self._installedActions.has_key(address): + action = self._installedActions[address] + action.undo() + self._installedActions.pop(address) + + + # ------------------ Event handling --------------------------------------- + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='s', out_signature='s') + def subscribe(self, pickled_event): + """ + Subscribe to an Event + @param pickled_event string pickled EventFilter + @return string unique name of registered event + """ + #TODO Perform event unmapping once Tutorials use abstract events + # instead of concrete EventFilters that are tied to their + # implementation. + eventfilter = pickle.loads(str(pickled_event)) + + # The callback uses the event defined previously and each + # successive call to subscribe will register a different + # callback that references a different event + def callback(*args): + self.notify(eventfilter) + + eventfilter.install_handlers(callback, activity=self._activity) + + name = self._generate_event_reference(eventfilter) + self._subscribedEvents[name] = eventfilter + + return name + + @dbus.service.method("org.tutorius.ProbeInterface", + in_signature='s', out_signature='') + def unsubscribe(self, address): + """ + Remove subscription to an event + @param address string adress returned by subscribe() + @return None + """ + + if self._subscribedEvents.has_key(address): + eventfilter = self._subscribedEvents[address] + eventfilter.remove_handlers() + self._subscribedEvents.pop(address) + + @dbus.service.signal("org.tutorius.ProbeInterface") + def eventOccured(self, event): + # We need no processing now, the signal will be sent + # when the method exit + pass + + # The actual method we will call on the probe to send events + def notify(self, event): + LOGGER.debug("TProbe :: notify event %s", str(event)) + self.eventOccured(pickle.dumps(event)) + + # Return a unique name for this action + def _generate_action_reference(self, action): + # TODO elavoie 2009-07-25 Should return a universal address + name = action.__class__.__name__ + suffix = 1 + + while self._installedActions.has_key(name+str(suffix)): + suffix += 1 + + return name + str(suffix) + + + # Return a unique name for this event + def _generate_event_reference(self, event): + # TODO elavoie 2009-07-25 Should return a universal address + name = event.__class__.__name__ + #Keep the counter to avoid looping all the time + suffix = getattr(self, '_event_ref_suffix', 0 ) + 1 + + while self._subscribedEvents.has_key(name+str(suffix)): + suffix += 1 + + #setattr(self, '_event_ref_suffix', suffix) + + return name + str(suffix) + +class ProbeProxy: + """ + ProbeProxy is a Proxy class for connecting to a remote TProbe. + + It provides an object interface to the TProbe, which requires pickled + strings, across a DBus communication. + """ + def __init__(self, activityName): + """ + Constructor + @param activityName unique activity id. Must be a valid dbus bus name. + """ + LOGGER.debug("ProbeProxy :: Creating ProbeProxy for %s (%d)", activityName, os.getpid()) + LOGGER.debug("ProbeProxy :: Current gobject context: %s", str(gobject.main_context_default())) + LOGGER.debug("ProbeProxy :: Current gobject depth: %s", str(gobject.main_depth())) + bus = dbus.SessionBus() + self._object = bus.get_object(activityName, "/tutorius/Probe") + self._probe = dbus.Interface(self._object, "org.tutorius.ProbeInterface") + + self._actions = {} + # We keep those two data structures to be able to have multiple callbacks + # for the same event and be able to remove them independently + # _subscribedEvents holds a list of callback addresses's for each event + # _registeredCallbacks holds the functions to call for each address + self._subscribedEvents = {} + self._registeredCallbacks = {} + + + self._object.connect_to_signal("eventOccured", self._handle_signal, dbus_interface="org.tutorius.ProbeInterface") + + def _handle_signal(self, pickled_event): + event = pickle.loads(str(pickled_event)) + LOGGER.debug("ProbeProxy :: Received Event : %s %s", str(event), str(event._props.items())) + + LOGGER.debug("ProbeProxy :: Currently %d events registered", len(self._registeredCallbacks)) + if self._registeredCallbacks.has_key(event): + for callback in self._registeredCallbacks[event].values(): + callback(event) + else: + for event in self._registeredCallbacks.keys(): + LOGGER.debug("==== %s", str(event._props.items())) + LOGGER.debug("ProbeProxy :: Event does not appear to be registered") + + def isAlive(self): + try: + return self._probe.ping() == "alive" + except: + return False + + def __update_action(self, action, address): + LOGGER.debug("ProbeProxy :: Updating action %s with address %s", str(action), str(address)) + self._actions[action] = str(address) + + def __clear_action(self, action): + self._actions.pop(action, None) + + def install(self, action, block=False): + """ + Install an action on the TProbe's activity + @param action Action to install + @param block Force a synchroneous dbus call if True + @return None + """ + return remote_call(self._probe.install, (pickle.dumps(action),), + save_args(self.__update_action, action), + block=block) + + def update(self, action, newaction, block=False): + """ + Update an already installed action's properties and run it again + @param action Action to update + @param newaction Action to update it with + @param block Force a synchroneous dbus call if True + @return None + """ + #TODO review how to make this work well + if not action in self._actions: + raise RuntimeWarning("Action not installed") + #TODO Check error handling + return remote_call(self._probe.update, (self._actions[action], pickle.dumps(newaction._props)), block=block) + + def uninstall(self, action, block=False): + """ + Uninstall an installed action + @param action Action to uninstall + @param block Force a synchroneous dbus call if True + """ + if action in self._actions: + remote_call(self._probe.uninstall,(self._actions.pop(action),), block=block) + + def __update_event(self, event, callback, address): + LOGGER.debug("ProbeProxy :: Registered event %s with address %s", str(hash(event)), str(address)) + # Since multiple callbacks could be associated to the same + # event signature, we will store multiple callbacks + # in a dictionary indexed by the unique address + # given for this subscribtion and access this + # dictionary from another one indexed by event + address = str(address) + + # We use the event object as a key + if not self._registeredCallbacks.has_key(event): + self._registeredCallbacks[event] = {} + + # TODO elavoie 2009-07-25 decide on a proper exception + # taxonomy + if self._registeredCallbacks[event].has_key(address): + # Oups, how come we have two similar addresses? + # send the bad news! + raise Exception("Probe subscribe exception, the following address already exists: " + str(address)) + + self._registeredCallbacks[event][address] = callback + + # We will keep another dictionary to remember the + # event that was associated to this unique address + # Let's copy to make sure that even if the event + # passed in is modified later it won't screw up + # our dictionary (python pass arguments by reference) + self._subscribedEvents[address] = copy.copy(event) + + return address + + def __clear_event(self, address): + LOGGER.debug("ProbeProxy :: Unregistering adress %s", str(address)) + # Cleanup everything + if self._subscribedEvents.has_key(address): + event = self._subscribedEvents[address] + + if self._registeredCallbacks.has_key(event)\ + and self._registeredCallbacks[event].has_key(address): + self._registeredCallbacks[event].pop(address) + + if self._registeredCallbacks[event] == {}: + self._registeredCallbacks.pop(event) + + self._subscribedEvents.pop(address) + else: + LOGGER.debug("ProbeProxy :: unsubsribe address %s inconsistency : not registered", address) + + def subscribe(self, event, callback, block=True): + """ + Register an event listener + @param event Event to listen for + @param callback callable that will be called when the event occurs + @param block Force a synchroneous dbus call if True (Not allowed yet) + @return address identifier used for unsubscribing + """ + LOGGER.debug("ProbeProxy :: Registering event %s", str(hash(event))) + if not block: + raise RuntimeError("This function does not allow non-blocking mode yet") + + # TODO elavoie 2009-07-25 When we will allow for patterns both + # for event types and sources, we will need to revise the lookup + # mecanism for which callback function to call + return remote_call(self._probe.subscribe, (pickle.dumps(event),), + save_args(self.__update_event, event, callback), + block=block) + + def unsubscribe(self, address, block=True): + """ + Unregister an event listener + @param address identifier given by subscribe() + @param block Force a synchroneous dbus call if True + @return None + """ + LOGGER.debug("ProbeProxy :: Unregister adress %s issued", str(address)) + if not block: + raise RuntimeError("This function does not allow non-blocking mode yet") + if address in self._subscribedEvents.keys(): + remote_call(self._probe.unsubscribe, (address,), + return_cb=save_args(self.__clear_event, address), + block=block) + else: + LOGGER.debug("ProbeProxy :: unsubsribe address %s failed : not registered", address) + + def detach(self, block=False): + """ + Detach the ProbeProxy from it's TProbe. All installed actions and + subscribed events should be removed. + """ + for action in self._actions.keys(): + self.uninstall(action, block) + + for address in self._subscribedEvents.keys(): + self.unsubscribe(address, block) + + +class ProbeManager(object): + """ + The ProbeManager provides multiplexing across multiple activity ProbeProxies + + For now, it only handles one at a time, though. + Actually it doesn't do much at all. But it keeps your encapsulation happy + """ + def __init__(self): + self._probes = {} + self._current_activity = None + + def setCurrentActivity(self, activity_id): + if not activity_id in self._probes: + raise RuntimeError("Activity not attached") + self._current_activity = activity_id + + def getCurrentActivity(self): + return self._current_activity + + currentActivity = property(fget=getCurrentActivity, fset=setCurrentActivity) + def attach(self, activity_id): + if activity_id in self._probes: + raise RuntimeWarning("Activity already attached") + + self._probes[activity_id] = ProbeProxy(activity_id) + #TODO what do we do with this? Raise something? + if self._probes[activity_id].isAlive(): + print "Alive!" + else: + print "FAil!" + + def detach(self, activity_id): + if activity_id in self._probes: + probe = self._probes.pop(activity_id) + probe.detach() + + def install(self, action, block=False): + """ + Install an action on the current activity + @param action Action to install + @param block Force a synchroneous dbus call if True + @return None + """ + if self.currentActivity: + return self._probes[self.currentActivity].install(action, block) + else: + raise RuntimeWarning("No activity attached") + + def update(self, action, newaction, block=False): + """ + Update an already installed action's properties and run it again + @param action Action to update + @param newaction Action to update it with + @param block Force a synchroneous dbus call if True + @return None + """ + if self.currentActivity: + return self._probes[self.currentActivity].update(action, newaction, block) + else: + raise RuntimeWarning("No activity attached") + + def uninstall(self, action, block=False): + """ + Uninstall an installed action + @param action Action to uninstall + @param block Force a synchroneous dbus call if True + """ + if self.currentActivity: + return self._probes[self.currentActivity].uninstall(action, block) + else: + raise RuntimeWarning("No activity attached") + + def subscribe(self, event, callback): + """ + Register an event listener + @param event Event to listen for + @param callback callable that will be called when the event occurs + @return address identifier used for unsubscribing + """ + if self.currentActivity: + return self._probes[self.currentActivity].subscribe(event, callback) + else: + raise RuntimeWarning("No activity attached") + + def unsubscribe(self, address): + """ + Unregister an event listener + @param address identifier given by subscribe() + @return None + """ + if self.currentActivity: + return self._probes[self.currentActivity].unsubscribe(address) + else: + raise RuntimeWarning("No activity attached") + -- cgit v0.9.1