From 6a23ca18836511cfd59cd381042228fe04b751de Mon Sep 17 00:00:00 2001 From: mike Date: Sun, 22 Nov 2009 23:10:02 +0000 Subject: LP 448319 : Moved event subscription to asynchronous mode --- (limited to 'tutorius') diff --git a/tutorius/TProbe.py b/tutorius/TProbe.py index 37a5e0f..e58dd03 100644 --- a/tutorius/TProbe.py +++ b/tutorius/TProbe.py @@ -361,7 +361,7 @@ class ProbeProxy: del self._actions[this_action] break - def __update_event(self, event, callback, address): + def __update_event(self, event, callback, event_subscribed_cb, address): LOGGER.debug("ProbeProxy :: Registered event %s with address %s", str(hash(event)), str(address)) # Since multiple callbacks could be associated to the same # event signature, we will store multiple callbacks @@ -377,7 +377,7 @@ class ProbeProxy: # TODO elavoie 2009-07-25 decide on a proper exception # taxonomy if self._registeredCallbacks[event].has_key(address): - # Oups, how come we have two similar addresses? + # Oops, how come we have two similar addresses? # send the bad news! raise Exception("Probe subscribe exception, the following address already exists: " + str(address)) @@ -390,6 +390,7 @@ class ProbeProxy: # our dictionary (python pass arguments by reference) self._subscribedEvents[address] = copy.copy(event) + event_subscribed_cb(address) return address def __clear_event(self, address): @@ -409,27 +410,25 @@ class ProbeProxy: else: LOGGER.debug("ProbeProxy :: unsubsribe address %s inconsistency : not registered", address) - def subscribe(self, event, notification_cb, event_installed_cb=ignore, error_cb=logError, block=True): + def subscribe(self, event, notification_cb, event_subscribed_cb, error_cb): """ Register an event listener @param event Event to listen for @param notification_cb callable that will be called when the event occurs @param event_installed_cb callable that will be called once the event is subscribed to @param error_cb callable that will be called if the subscription fails - @param block Force a synchroneous dbus call if True (Not allowed yet) @return address identifier used for unsubscribing """ LOGGER.debug("ProbeProxy :: Registering event %s", str(hash(event))) - if not block: - raise RuntimeError("This function does not allow non-blocking mode yet") + #if not block: + # raise RuntimeError("This function does not allow non-blocking mode yet") # TODO elavoie 2009-07-25 When we will allow for patterns both # for event types and sources, we will need to revise the lookup # mecanism for which callback function to call - return remote_call(self._probe.subscribe, (pickle.dumps(event),), - return_cb=save_args(self.__update_event, event, notification_cb), - error_cb=save_args(error_cb, event), - block=block) + self._probe.subscribe(pickle.dumps(event), + reply_handler=save_args(self.__update_event, event, notification_cb, event_subscribed_cb), + error_handler=save_args(error_cb, event)) def unsubscribe(self, address, block=True): """ @@ -529,15 +528,20 @@ class ProbeManager(object): else: raise RuntimeWarning("No activity attached") - def subscribe(self, event, callback): + def subscribe(self, event, notification_cb, event_subscribed_cb, error_cb): """ Register an event listener @param event Event to listen for - @param callback callable that will be called when the event occurs + @param notification_cb callable that will be called when the event occurs + @param subscribe_cb callable that will be called once the action has been + installed + @param error_cb callable that will be called if an error happens during + installation @return address identifier used for unsubscribing """ if self.currentActivity: - return self._first_proxy(self.currentActivity).subscribe(event, callback) + return self._first_proxy(self.currentActivity).subscribe(event, notification_cb,\ + event_subscribed_cb, error_cb) else: raise RuntimeWarning("No activity attached") diff --git a/tutorius/engine.py b/tutorius/engine.py index bb2453a..b8fe988 100644 --- a/tutorius/engine.py +++ b/tutorius/engine.py @@ -1,4 +1,21 @@ +# Copyright (C) 2009, Tutorius.org +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + import logging +from heapq import heappush, heappop import dbus.mainloop.glib from jarabe.model import shell from sugar.bundle.activitybundle import ActivityBundle @@ -9,6 +26,18 @@ from .dbustools import save_args from .tutorial import Tutorial, AutomaticTransitionEvent from .translator import ResourceTranslator +# Priority values for queuable messages +STOP_MSG_PRIORITY = 5 +EVENT_NOTIFICATION_MSG_PRIORITY = 10 + +# List of runner states +RUNNER_STATE_IDLE = 0 +RUNNER_STATE_SETUP_ACTIONS = 1 +RUNNER_STATE_SETUP_EVENTS = 2 +RUNNER_STATE_AWAITING_NOTIFICATIONS = 3 +RUNNER_STATE_UNINSTALLING_ACTIONS = 4 +RUNNER_STATE_UNSUBSCRIBING_EVENTS = 5 + LOGGER = logging.getLogger("sugar.tutorius.engine") class TutorialRunner(object): @@ -23,14 +52,25 @@ class TutorialRunner(object): self._tutorial = tutorial self._pM = probeManager + # The tutorial runner's state. For more details, see : + # https://docs.google.com/Doc?docid=0AVT_nzmWT2B2ZGN3dDd2MzRfNTBka3J4bW1kaA&hl=en + self._runner_state = RUNNER_STATE_IDLE + + # The message queue is a heap, so only heap operations should be done + # on it like heappush, heappop, etc... + self._message_queue = [] + #State self._state = None - self._sEvents = set() #Subscribed Events #Cached objects self._actions = {} self._installed_actions = {} - self._install_errors = {} + self._installation_errors = {} + + # Subscribed Events + self._subscribed_events = {} + self._subscription_errors = {} #Temp FIX until event/actions have an activity id self._activity_id = None @@ -44,42 +84,70 @@ class TutorialRunner(object): self.enterState(self._tutorial.INIT) def stop(self): + if self._runner_state == RUNNER_STATE_SETUP_ACTIONS or \ + self._runner_state == RUNNER_STATE_SETUP_EVENTS: + heappush(self._message_queue, (STOP_MSG_PRIORITY, None)) + else: + self._execute_stop() + + def _execute_stop(self): self.setCurrentActivity() #Temp Hack until activity in events/actions self.enterState(self._tutorial.END) self._teardownState() self._state = None def _handleEvent(self, next_state, event): - #FIXME sanity check, log event that was not installed and ignore - self.enterState(next_state) + # Look if we are actually receiving notifications + if self._runner_state == RUNNER_STATE_AWAITING_NOTIFICATIONS: + LOGGER.debug("TutorialRunner :: Received event notification in AWAITING_NOTIFICATIONS for %s"%str(event)) + transitions = self._tutorial.get_transition_dict(self._state) + for (this_event, this_next_state_name) in transitions.values(): + if event == this_event and next_state == this_next_state_name: + self.enterState(next_state) + elif self._runner_state == RUNNER_STATE_SETUP_EVENTS: + LOGGER.debug("TutorialRunner :: Queuing event notification to go to state %s"%next_state) + # Push the message on the queue + heappush(self._message_queue, (EVENT_NOTIFICATION_MSG_PRIORITY, (next_state, event))) + # Ignore the message for all other states def _teardownState(self): if self._state is None: #No state, no teardown return - + #Clear the current actions for (action_name, action_address) in self._installed_actions.items(): LOGGER.debug("TutorialRunner :: Uninstalling action %s with address %s"%(action_name, action_address)) self._pM.uninstall(action_address) self._actions = {} self._installed_actions.clear() - self._install_errors.clear() + self._installation_errors.clear() #Clear the EventFilters - for event in self._sEvents: + for (event_name, event) in self._subscribed_events.items(): self._pM.unsubscribe(event) - self._sEvents.clear() + self._subscribed_events.clear() + self._subscription_errors.clear() def __action_installed(self, action_name, action, address): LOGGER.debug("TutorialRunner :: Action %s received address %s"%(action_name, address)) self._installed_actions[action_name] = address + # Verify if we just completed the installation of the actions for this state + self.__verify_action_install_state() + def __install_error(self, action_name, action, exception): + # TODO : Fix this as it doesn't warn the user about the problem or anything + LOGGER.debug("TutorialRunner :: Action could not be installed %s, exception was : %s"%(str(action) + str(exception))) + self._installation_errors[action_name] = exception + self.__verify_action_install_state() + + def __verify_action_install_state(self): # Do the check to see if we have finished installing all the actions by either having # received a address for it or an error message install_complete = True for (this_action_name, this_action) in self._actions.items(): - if not this_action_name in self._installed_actions.keys() and not this_action in self._install_errors.keys(): + if not this_action_name in self._installed_actions.keys() and \ + not this_action in self._installation_errors.keys(): # There's at least one uninstalled action, so we still wait install_complete = False break @@ -87,46 +155,101 @@ class TutorialRunner(object): if install_complete: LOGGER.debug("TutorialRunner :: All actions installed!") # Raise the All Actions Installed event for the TutorialRunner state - self._all_actions_installed() + self.__all_actions_installed() - def __install_error(self, action_name, action, exception): - # TODO : Fix this as it doesn't warn the user about the problem or anything - LOGGER.debug("TutorialRunner :: Action could not be installed %s, exception was : %s"%(str(action) + str(exception))) - self._install_errors[action_name] = exception + def __event_subscribed(self, event_name, event_address): + LOGGER.debug("TutorialRunner :: Event %s was subscribed to, located at address %s"%(event_name, event_address)) + self._subscribed_events[event_name] = event_address + + # Verify if we just completed the subscription of all the events for this state + self.__verify_event_install_state() + + def __subscribe_error(self, event_name, exception): + # TODO : Do correct error handling here + LOGGER.debug("TutorialRunner :: Could not subscribe to event %s, got exception : "%(event_name, str(exception))) + self._subscribed_error[event_name] = exception - def _all_actions_installed(self): + # Verify if we just completed the subscription of all the events for this state + self.__verify_event_install_state() + + def __verify_event_install_state(self): transitions = self._tutorial.get_transition_dict(self._state) + # Check to see if we completed all the event subscriptions + subscribe_complete = True + for (this_event_name, (this_event, next_state)) in transitions.items(): + if not this_event in self._subscribed_events.keys() and \ + not this_event in self._subscription_errors.keys(): + subscribe_complete = False + break + + if subscribe_complete: + LOGGER.debug("TutorialRunner : Subscribed to all events!") + self.__all_events_subscribed() + + def __all_actions_installed(self): + self._runner_state = RUNNER_STATE_SETUP_EVENTS + # Process the messages that might have been stored + self.__process_pending_messages() + + # If we processed a message that changed the runner state, we need to stop + # processing + if self._runner_state != RUNNER_STATE_SETUP_EVENTS: + return + + # Start subscribing to events + transitions = self._tutorial.get_transition_dict(self._state) + + # If there are no transitions, raise the All Events Subscribed message if len(transitions) == 0: + self.__all_events_subscribed() return + + # Send all the event registration for (event, next_state) in transitions.values(): - self._sEvents.add(self._pM.subscribe(event, save_args(self._handleEvent, next_state))) + self._pM.subscribe(event, + save_args(self._handleEvent, next_state), + save_args(self.__event_subscribed, event), + save_args(self.__subscribe_error, event)) + + def __all_events_subscribed(self): + self._runner_state = RUNNER_STATE_AWAITING_NOTIFICATIONS + self.__process_pending_messages() + + def __process_pending_messages(self): + while len(self._message_queue) != 0: + (priority, message) = heappop(self._message_queue) + + if priority == STOP_MSG_PRIORITY: + LOGGER.debug("TutorialRunner :: Stop message taken from message queue") + # We can safely ignore the rest of the events + self._message_queue = [] + self._execute_stop() + # Start removing the installed addons + #if self._runner_state == RUNNER_STATE_AWAITING_NOTIFICATIONS: + # # Start uninstalling the events + # self.__unsubscribe_events() + #if self._runner_state == RUNNER_STATE_SETUP_EVENTS: + # self.__uninstall_actions() + elif priority == EVENT_NOTIFICATION_MSG_PRIORITY: + LOGGER.debug("TutorialRunner :: Handling stored event notification for next_state %s"%message[0]) + self._handle_event(*message) def _setupState(self): if self._state is None: raise RuntimeError("Attempting to setupState without a state") - # Handle the automatic event - state_name = self._state - self._actions = self._tutorial.get_action_dict(self._state) - transitions = self._tutorial.get_transition_dict(self._state) - - for (event, next_state) in transitions.values(): - if isinstance(event, AutomaticTransitionEvent): - state_name = next_state - return state_name if len(self._actions) == 0: - self._all_actions_installed() - return state_name + self.__all_actions_installed() + return for (action_name, action) in self._actions.items(): - self._pM.install(action, save_args(self.__action_installed, action_name), - save_args(self.__install_error, action_name)) LOGGER.debug("TutorialRunner :: Installed action %s"%(action_name)) - - return state_name + self._pM.install(action, + save_args(self.__action_installed, action_name), + save_args(self.__install_error, action_name)) def enterState(self, state_name): """ @@ -141,18 +264,26 @@ class TutorialRunner(object): @param state_name The name of the state to enter in """ self.setCurrentActivity() #Temp Hack until activity in events/actions + # Set the runner state to actions setup + self._runner_state = RUNNER_STATE_SETUP_ACTIONS - # Recursive base case - if state_name == self._state: - #Nothing to do - return + real_next_state = None + skip_to_state = state_name + + # As long as we do have automatic transitions, skip them to go to the + # next state + while skip_to_state != real_next_state: + real_next_state = skip_to_state + transitions = self._tutorial.get_transition_dict(skip_to_state) + for (event, next_state) in transitions.values(): + if isinstance(event, AutomaticTransitionEvent): + skip_to_state = next_state + break self._teardownState() - self._state = state_name + self._state = real_next_state - # Recursively call the enterState in case there was an automatic - # transition in the state definition - self.enterState(self._setupState()) + self._setupState() class Engine: """ diff --git a/tutorius/translator.py b/tutorius/translator.py index d0504be..bfa1746 100644 --- a/tutorius/translator.py +++ b/tutorius/translator.py @@ -22,6 +22,7 @@ logger = logging.getLogger("ResourceTranslator") from .properties import * from .vault import Vault +from .dbustools import save_args class ResourceTranslator(object): """ @@ -53,6 +54,8 @@ class ResourceTranslator(object): self._tutorial_id = tutorial_id self._translation_mapping = {} + self._action_installed_cbs = {} + self._install_error_cbs = {} def translate_resource(self, res_value): """ @@ -133,10 +136,11 @@ class ResourceTranslator(object): # Change the list contained in the addonlist property, since # we got a copy of the list when requesting it prop_container.replace_property(propname, prop_value) - - ### ProbeManager interface for decorator ### - - ## Unchanged functions ## + + ########################################################################### + ### ProbeManager interface for decorator + + # Unchanged functions def setCurrentActivity(self, activity_id): self._probe_manager.currentActivity = activity_id @@ -150,8 +154,8 @@ class ResourceTranslator(object): def detach(self, activity_id): self._probe_manager.detach(activity_id) - def subscribe(self, event, callback): - return self._probe_manager.subscribe(event, callback) + def subscribe(self, event, notification_cb, event_subscribed_cb, error_cb): + return self._probe_manager.subscribe(event, notification_cb, event_subscribed_cb, error_cb) def unsubscribe(self, address): return self._probe_manager.unsubscribe(address) @@ -165,8 +169,24 @@ class ResourceTranslator(object): def get_registered_probes_list(self, process_name=None): return self._probe_manager.get_registered_probes_list(process_name) - ## Decorated functions ## - def install(self, action, callback, block=False): + ########################################################################### + + def action_installed(self, new_action, address): + # Update the internal mapping + self._translation_mapping[address] = new_action + + # Callback to the upper layers to inform them that the action + # was installed + action_installed_cb = self._action_installed_cbs[new_action] + action_installed_cb(address) + + def action_install_error(self, new_action, exception): + # Warn the upper layer that the installation failed + error_cb = self._install_error_cbs[new_action] + error_cb(old_action, exception) + + # Decorated functions + def install(self, action, action_installed_cb, error_cb): # Make a new copy of the action that we want to install, # because translate() changes the action and we # don't want to modify the caller's action representation @@ -174,31 +194,24 @@ class ResourceTranslator(object): # Execute the replacement self.translate(new_action) - # Send the new action to the probe manager - action_address = self._probe_manager.install(new_action, callback, block) - - # Remember the address - self._translation_mapping[action_address] = new_action + self._action_installed_cbs[new_action] = save_args(action_installed_cb, action) + self._install_error_cbs[new_action] = save_args(error_cb, action) - return action_address + # Send the new action to the probe manager + self._probe_manager.install(new_action, save_args(self.action_installed, new_action), + save_args(self.action_install_error, new_action)) - def update(self, action_address, newaction, block=False): - # TODO : Repair this as it currently doesn't work. - # Actions are being copied, then translated in install(), so they - # won't be addressable via the same object that is in the Tutorial - # Runner. + def update(self, action_address, newaction): translated_new_action = copy_module.deepcopy(newaction) self.translate(translated_new_action) self._translation_mapping[action_address] = translated_new_action - return self._probe_manager.update(action_address, translated_new_action, block) - - def uninstall(self, action_address, block=False): - return_value = self._probe_manager.uninstall(action_address, block) + self._probe_manager.update(action_address, translated_new_action, block) + def uninstall(self, action_address): if self._translation_mapping.has_key(action_address): del self._translation_mapping[action_address] - return return_value + self._probe_manager.uninstall(action_address) -- cgit v0.9.1