import logging LOGGER = logging.getLogger("sugar.tutorius.TProbe") import os import gobject import dbus import dbus.service import cPickle as pickle from . import addon from .services import ObjectStore from .properties import TPropContainer from .dbustools import remote_call, save_args import copy """ -------------------- | ProbeManager | -------------------- | V -------------------- ---------- | ProbeProxy |<---- DBus ---->| TProbe | -------------------- ---------- """ #TODO Add stub error handling for remote calls in the classes so that it will # be clearer how errors can be handled in the future. class TProbe(dbus.service.Object): """ Tutorius Probe Defines an entry point for Tutorius into activities that allows performing actions and registering events onto an activity via a DBUS Interface. """ def __init__(self, activity_name, activity): """ Create and register a TProbe for an activity. @param activity_name unique activity_id @param activity activity reference, must be a gtk container """ LOGGER.debug("TProbe :: Creating TProbe for %s (%d)", activity_name, os.getpid()) LOGGER.debug("TProbe :: Current gobject context: %s", str(gobject.main_context_default())) LOGGER.debug("TProbe :: Current gobject depth: %s", str(gobject.main_depth())) # Moving the ObjectStore assignment here, in the meantime # the reference to the activity shouldn't be share as a # global variable but passed by the Probe to the objects # that requires it self._activity = activity ObjectStore().activity = activity self._activity_name = activity_name self._session_bus = dbus.SessionBus() # Giving a new name because _name is already used by dbus self._name2 = dbus.service.BusName(activity_name, self._session_bus) dbus.service.Object.__init__(self, self._session_bus, "/tutorius/Probe") # Add the dictionary we will use to store which actions and events # are known self._installedActions = {} self._subscribedEvents = {} def start(self): """ Optional method to call if the probe is not inserted into an existing activity. Starts a gobject mainloop """ mainloop = gobject.MainLoop() print "Starting Probe for " + self._activity_name mainloop.run() @dbus.service.method("org.tutorius.ProbeInterface", in_signature='s', out_signature='') def registered(self, service): print ("Registered with: " + str(service)) @dbus.service.method("org.tutorius.ProbeInterface", in_signature='', out_signature='s') def ping(self): """ Allows testing the connection to a Probe @return string "alive" """ return "alive" # ------------------ Action handling -------------------------------------- @dbus.service.method("org.tutorius.ProbeInterface", in_signature='s', out_signature='s') def install(self, pickled_action): """ Install an action on the Activity @param pickled_action string pickled action @return string address of installed action """ loaded_action = pickle.loads(str(pickled_action)) action = addon.create(loaded_action.__class__.__name__) address = self._generate_action_reference(action) self._installedActions[address] = action if action._props: action._props.update(loaded_action._props) action.do() return address @dbus.service.method("org.tutorius.ProbeInterface", in_signature='ss', out_signature='') def update(self, address, action_props): """ Update an already registered action @param address string address returned by install() @param action_props pickled action properties @return None """ action = self._installedActions[address] if action._props: props = pickle.loads(str(action_props)) action._props.update(props) action.undo() action.do() @dbus.service.method("org.tutorius.ProbeInterface", in_signature='s', out_signature='') def uninstall(self, address): """ Uninstall an action @param address string address returned by install() @return None """ if self._installedActions.has_key(address): action = self._installedActions[address] action.undo() self._installedActions.pop(address) # ------------------ Event handling --------------------------------------- @dbus.service.method("org.tutorius.ProbeInterface", in_signature='s', out_signature='s') def subscribe(self, pickled_event): """ Subscribe to an Event @param pickled_event string pickled EventFilter @return string unique name of registered event """ #TODO Perform event unmapping once Tutorials use abstract events # instead of concrete EventFilters that are tied to their # implementation. eventfilter = pickle.loads(str(pickled_event)) # The callback uses the event defined previously and each # successive call to subscribe will register a different # callback that references a different event def callback(*args): self.notify(eventfilter) eventfilter.install_handlers(callback, activity=self._activity) name = self._generate_event_reference(eventfilter) self._subscribedEvents[name] = eventfilter return name @dbus.service.method("org.tutorius.ProbeInterface", in_signature='s', out_signature='') def unsubscribe(self, address): """ Remove subscription to an event @param address string adress returned by subscribe() @return None """ if self._subscribedEvents.has_key(address): eventfilter = self._subscribedEvents[address] eventfilter.remove_handlers() self._subscribedEvents.pop(address) @dbus.service.signal("org.tutorius.ProbeInterface") def eventOccured(self, event): # We need no processing now, the signal will be sent # when the method exit pass # The actual method we will call on the probe to send events def notify(self, event): LOGGER.debug("TProbe :: notify event %s", str(event)) #Check that this event is even allowed if event in self._subscribedEvents.values(): self.eventOccured(pickle.dumps(event)) else: raise RuntimeWarning("Attempted to raise an unregistered event") # Return a unique name for this action def _generate_action_reference(self, action): # TODO elavoie 2009-07-25 Should return a universal address name = action.__class__.__name__ suffix = 1 while self._installedActions.has_key(name+str(suffix)): suffix += 1 return name + str(suffix) # Return a unique name for this event def _generate_event_reference(self, event): # TODO elavoie 2009-07-25 Should return a universal address name = event.__class__.__name__ #Keep the counter to avoid looping all the time suffix = getattr(self, '_event_ref_suffix', 0 ) + 1 while self._subscribedEvents.has_key(name+str(suffix)): suffix += 1 #setattr(self, '_event_ref_suffix', suffix) return name + str(suffix) class ProbeProxy: """ ProbeProxy is a Proxy class for connecting to a remote TProbe. It provides an object interface to the TProbe, which requires pickled strings, across a DBus communication. """ def __init__(self, activityName): """ Constructor @param activityName unique activity id. Must be a valid dbus bus name. """ LOGGER.debug("ProbeProxy :: Creating ProbeProxy for %s (%d)", activityName, os.getpid()) LOGGER.debug("ProbeProxy :: Current gobject context: %s", str(gobject.main_context_default())) LOGGER.debug("ProbeProxy :: Current gobject depth: %s", str(gobject.main_depth())) bus = dbus.SessionBus() self._object = bus.get_object(activityName, "/tutorius/Probe") self._probe = dbus.Interface(self._object, "org.tutorius.ProbeInterface") self._actions = {} # We keep those two data structures to be able to have multiple callbacks # for the same event and be able to remove them independently # _subscribedEvents holds a list of callback addresses's for each event # _registeredCallbacks holds the functions to call for each address self._subscribedEvents = {} self._registeredCallbacks = {} self._object.connect_to_signal("eventOccured", self._handle_signal, dbus_interface="org.tutorius.ProbeInterface") def _handle_signal(self, pickled_event): event = pickle.loads(str(pickled_event)) LOGGER.debug("ProbeProxy :: Received Event : %s %s", str(event), str(event._props.items())) LOGGER.debug("ProbeProxy :: Currently %d events registered", len(self._registeredCallbacks)) if self._registeredCallbacks.has_key(event): for callback in self._registeredCallbacks[event].values(): callback(event) else: for event in self._registeredCallbacks.keys(): LOGGER.debug("==== %s", str(event._props.items())) LOGGER.debug("ProbeProxy :: Event does not appear to be registered") def isAlive(self): try: return self._probe.ping() == "alive" except: return False def __update_action(self, action, address): LOGGER.debug("ProbeProxy :: Updating action %s with address %s", str(action), str(address)) self._actions[action] = str(address) def __clear_action(self, action): self._actions.pop(action, None) def install(self, action, block=False): """ Install an action on the TProbe's activity @param action Action to install @param block Force a synchroneous dbus call if True @return None """ return remote_call(self._probe.install, (pickle.dumps(action),), save_args(self.__update_action, action), block=block) def update(self, action, newaction, block=False): """ Update an already installed action's properties and run it again @param action Action to update @param newaction Action to update it with @param block Force a synchroneous dbus call if True @return None """ #TODO review how to make this work well if not action in self._actions: raise RuntimeWarning("Action not installed") #TODO Check error handling return remote_call(self._probe.update, (self._actions[action], pickle.dumps(newaction._props)), block=block) def uninstall(self, action, block=False): """ Uninstall an installed action @param action Action to uninstall @param block Force a synchroneous dbus call if True """ if action in self._actions: remote_call(self._probe.uninstall,(self._actions.pop(action),), block=block) def __update_event(self, event, callback, address): LOGGER.debug("ProbeProxy :: Registered event %s with address %s", str(hash(event)), str(address)) # Since multiple callbacks could be associated to the same # event signature, we will store multiple callbacks # in a dictionary indexed by the unique address # given for this subscribtion and access this # dictionary from another one indexed by event address = str(address) # We use the event object as a key if not self._registeredCallbacks.has_key(event): self._registeredCallbacks[event] = {} # TODO elavoie 2009-07-25 decide on a proper exception # taxonomy if self._registeredCallbacks[event].has_key(address): # Oups, how come we have two similar addresses? # send the bad news! raise Exception("Probe subscribe exception, the following address already exists: " + str(address)) self._registeredCallbacks[event][address] = callback # We will keep another dictionary to remember the # event that was associated to this unique address # Let's copy to make sure that even if the event # passed in is modified later it won't screw up # our dictionary (python pass arguments by reference) self._subscribedEvents[address] = copy.copy(event) return address def __clear_event(self, address): LOGGER.debug("ProbeProxy :: Unregistering adress %s", str(address)) # Cleanup everything if self._subscribedEvents.has_key(address): event = self._subscribedEvents[address] if self._registeredCallbacks.has_key(event)\ and self._registeredCallbacks[event].has_key(address): self._registeredCallbacks[event].pop(address) if self._registeredCallbacks[event] == {}: self._registeredCallbacks.pop(event) self._subscribedEvents.pop(address) else: LOGGER.debug("ProbeProxy :: unsubsribe address %s inconsistency : not registered", address) def subscribe(self, event, callback, block=True): """ Register an event listener @param event Event to listen for @param callback callable that will be called when the event occurs @param block Force a synchroneous dbus call if True (Not allowed yet) @return address identifier used for unsubscribing """ LOGGER.debug("ProbeProxy :: Registering event %s", str(hash(event))) if not block: raise RuntimeError("This function does not allow non-blocking mode yet") # TODO elavoie 2009-07-25 When we will allow for patterns both # for event types and sources, we will need to revise the lookup # mecanism for which callback function to call return remote_call(self._probe.subscribe, (pickle.dumps(event),), save_args(self.__update_event, event, callback), block=block) def unsubscribe(self, address, block=True): """ Unregister an event listener @param address identifier given by subscribe() @param block Force a synchroneous dbus call if True @return None """ LOGGER.debug("ProbeProxy :: Unregister adress %s issued", str(address)) if address in self._subscribedEvents.keys(): remote_call(self._probe.unsubscribe, (address,), return_cb=save_args(self.__clear_event, address), block=block) else: LOGGER.debug("ProbeProxy :: unsubsribe address %s failed : not registered", address) def detach(self, block=False): """ Detach the ProbeProxy from it's TProbe. All installed actions and subscribed events should be removed. """ for action_addr in self._actions.keys(): self.uninstall(action_addr, block) for address in self._subscribedEvents.keys(): self.unsubscribe(address, block) class ProbeManager(object): """ The ProbeManager provides multiplexing across multiple activity ProbeProxies For now, it only handles one at a time, though. Actually it doesn't do much at all. But it keeps your encapsulation happy """ def __init__(self, proxy_class=ProbeProxy): """Constructor @param proxy_class Class to use for creating Proxies to activities. The class should support the same interface as ProbeProxy. Exists to make this class unit-testable by replacing the Proxy with a mock """ self._ProxyClass = proxy_class self._probes = {} self._current_activity = None def setCurrentActivity(self, activity_id): if not activity_id in self._probes: raise RuntimeError("Activity not attached") self._current_activity = activity_id def getCurrentActivity(self): return self._current_activity currentActivity = property(fget=getCurrentActivity, fset=setCurrentActivity) def attach(self, activity_id): if activity_id in self._probes: raise RuntimeWarning("Activity already attached") self._probes[activity_id] = self._ProxyClass(activity_id) #TODO what do we do with this? Raise something? if self._probes[activity_id].isAlive(): print "Alive!" else: print "FAil!" def detach(self, activity_id): if activity_id in self._probes: probe = self._probes.pop(activity_id) probe.detach() if self._current_activity == activity_id: self._current_activity = None def install(self, action, block=False): """ Install an action on the current activity @param action Action to install @param block Force a synchroneous dbus call if True @return None """ if self.currentActivity: return self._probes[self.currentActivity].install(action, block) else: raise RuntimeWarning("No activity attached") def update(self, action, newaction, block=False): """ Update an already installed action's properties and run it again @param action Action to update @param newaction Action to update it with @param block Force a synchroneous dbus call if True @return None """ if self.currentActivity: return self._probes[self.currentActivity].update(action, newaction, block) else: raise RuntimeWarning("No activity attached") def uninstall(self, action, block=False): """ Uninstall an installed action @param action Action to uninstall @param block Force a synchroneous dbus call if True """ if self.currentActivity: return self._probes[self.currentActivity].uninstall(action, block) else: raise RuntimeWarning("No activity attached") def subscribe(self, event, callback): """ Register an event listener @param event Event to listen for @param callback callable that will be called when the event occurs @return address identifier used for unsubscribing """ if self.currentActivity: return self._probes[self.currentActivity].subscribe(event, callback) else: raise RuntimeWarning("No activity attached") def unsubscribe(self, address): """ Unregister an event listener @param address identifier given by subscribe() @return None """ if self.currentActivity: return self._probes[self.currentActivity].unsubscribe(address) else: raise RuntimeWarning("No activity attached")