Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/process/wspbus.py
diff options
context:
space:
mode:
authorSebastian Silva <sebastian@somosazucar.org>2011-07-09 00:17:44 (GMT)
committer Icarito <icarito@spock.(none)>2011-07-09 00:18:57 (GMT)
commit570a268e7562303690ef6b599ea244945a3100ce (patch)
tree1f772420739a73515671f73dfeb397870daa9fe0 /cherrypy/process/wspbus.py
parent365ef228a2a94708024030d3993bb9f0a152a038 (diff)
Still importing WebSDK.
Need to read up on GIT.
Diffstat (limited to 'cherrypy/process/wspbus.py')
-rwxr-xr-xcherrypy/process/wspbus.py393
1 files changed, 393 insertions, 0 deletions
diff --git a/cherrypy/process/wspbus.py b/cherrypy/process/wspbus.py
new file mode 100755
index 0000000..46cd75a
--- /dev/null
+++ b/cherrypy/process/wspbus.py
@@ -0,0 +1,393 @@
+"""An implementation of the Web Site Process Bus.
+
+This module is completely standalone, depending only on the stdlib.
+
+Web Site Process Bus
+--------------------
+
+A Bus object is used to contain and manage site-wide behavior:
+daemonization, HTTP server start/stop, process reload, signal handling,
+drop privileges, PID file management, logging for all of these,
+and many more.
+
+In addition, a Bus object provides a place for each web framework
+to register code that runs in response to site-wide events (like
+process start and stop), or which controls or otherwise interacts with
+the site-wide components mentioned above. For example, a framework which
+uses file-based templates would add known template filenames to an
+autoreload component.
+
+Ideally, a Bus object will be flexible enough to be useful in a variety
+of invocation scenarios:
+
+ 1. The deployer starts a site from the command line via a
+ framework-neutral deployment script; applications from multiple frameworks
+ are mixed in a single site. Command-line arguments and configuration
+ files are used to define site-wide components such as the HTTP server,
+ WSGI component graph, autoreload behavior, signal handling, etc.
+ 2. The deployer starts a site via some other process, such as Apache;
+ applications from multiple frameworks are mixed in a single site.
+ Autoreload and signal handling (from Python at least) are disabled.
+ 3. The deployer starts a site via a framework-specific mechanism;
+ for example, when running tests, exploring tutorials, or deploying
+ single applications from a single framework. The framework controls
+ which site-wide components are enabled as it sees fit.
+
+The Bus object in this package uses topic-based publish-subscribe
+messaging to accomplish all this. A few topic channels are built in
+('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
+site containers are free to define their own. If a message is sent to a
+channel that has not been defined or has no listeners, there is no effect.
+
+In general, there should only ever be a single Bus object per process.
+Frameworks and site containers share a single Bus object by publishing
+messages and subscribing listeners.
+
+The Bus object works as a finite state machine which models the current
+state of the process. Bus methods move it from one state to another;
+those methods then publish to subscribed listeners on the channel for
+the new state.::
+
+ O
+ |
+ V
+ STOPPING --> STOPPED --> EXITING -> X
+ A A |
+ | \___ |
+ | \ |
+ | V V
+ STARTED <-- STARTING
+
+"""
+
+import atexit
+import os
+import sys
+import threading
+import time
+import traceback as _traceback
+import warnings
+
+from cherrypy._cpcompat import set
+
+# Here I save the value of os.getcwd(), which, if I am imported early enough,
+# will be the directory from which the startup script was run. This is needed
+# by _do_execv(), to change back to the original directory before execv()ing a
+# new process. This is a defense against the application having changed the
+# current working directory (which could make sys.executable "not found" if
+# sys.executable is a relative-path, and/or cause other problems).
+_startup_cwd = os.getcwd()
+
+class ChannelFailures(Exception):
+ """Exception raised when errors occur in a listener during Bus.publish()."""
+ delimiter = '\n'
+
+ def __init__(self, *args, **kwargs):
+ # Don't use 'super' here; Exceptions are old-style in Py2.4
+ # See http://www.cherrypy.org/ticket/959
+ Exception.__init__(self, *args, **kwargs)
+ self._exceptions = list()
+
+ def handle_exception(self):
+ """Append the current exception to self."""
+ self._exceptions.append(sys.exc_info())
+
+ def get_instances(self):
+ """Return a list of seen exception instances."""
+ return [instance for cls, instance, traceback in self._exceptions]
+
+ def __str__(self):
+ exception_strings = map(repr, self.get_instances())
+ return self.delimiter.join(exception_strings)
+
+ __repr__ = __str__
+
+ def __nonzero__(self):
+ return bool(self._exceptions)
+
+# Use a flag to indicate the state of the bus.
+class _StateEnum(object):
+ class State(object):
+ name = None
+ def __repr__(self):
+ return "states.%s" % self.name
+
+ def __setattr__(self, key, value):
+ if isinstance(value, self.State):
+ value.name = key
+ object.__setattr__(self, key, value)
+states = _StateEnum()
+states.STOPPED = states.State()
+states.STARTING = states.State()
+states.STARTED = states.State()
+states.STOPPING = states.State()
+states.EXITING = states.State()
+
+
+class Bus(object):
+ """Process state-machine and messenger for HTTP site deployment.
+
+ All listeners for a given channel are guaranteed to be called even
+ if others at the same channel fail. Each failure is logged, but
+ execution proceeds on to the next listener. The only way to stop all
+ processing from inside a listener is to raise SystemExit and stop the
+ whole server.
+ """
+
+ states = states
+ state = states.STOPPED
+ execv = False
+
+ def __init__(self):
+ self.execv = False
+ self.state = states.STOPPED
+ self.listeners = dict(
+ [(channel, set()) for channel
+ in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
+ self._priorities = {}
+
+ def subscribe(self, channel, callback, priority=None):
+ """Add the given callback at the given channel (if not present)."""
+ if channel not in self.listeners:
+ self.listeners[channel] = set()
+ self.listeners[channel].add(callback)
+
+ if priority is None:
+ priority = getattr(callback, 'priority', 50)
+ self._priorities[(channel, callback)] = priority
+
+ def unsubscribe(self, channel, callback):
+ """Discard the given callback (if present)."""
+ listeners = self.listeners.get(channel)
+ if listeners and callback in listeners:
+ listeners.discard(callback)
+ del self._priorities[(channel, callback)]
+
+ def publish(self, channel, *args, **kwargs):
+ """Return output of all subscribers for the given channel."""
+ if channel not in self.listeners:
+ return []
+
+ exc = ChannelFailures()
+ output = []
+
+ items = [(self._priorities[(channel, listener)], listener)
+ for listener in self.listeners[channel]]
+ items.sort()
+ for priority, listener in items:
+ try:
+ output.append(listener(*args, **kwargs))
+ except KeyboardInterrupt:
+ raise
+ except SystemExit, e:
+ # If we have previous errors ensure the exit code is non-zero
+ if exc and e.code == 0:
+ e.code = 1
+ raise
+ except:
+ exc.handle_exception()
+ if channel == 'log':
+ # Assume any further messages to 'log' will fail.
+ pass
+ else:
+ self.log("Error in %r listener %r" % (channel, listener),
+ level=40, traceback=True)
+ if exc:
+ raise exc
+ return output
+
+ def _clean_exit(self):
+ """An atexit handler which asserts the Bus is not running."""
+ if self.state != states.EXITING:
+ warnings.warn(
+ "The main thread is exiting, but the Bus is in the %r state; "
+ "shutting it down automatically now. You must either call "
+ "bus.block() after start(), or call bus.exit() before the "
+ "main thread exits." % self.state, RuntimeWarning)
+ self.exit()
+
+ def start(self):
+ """Start all services."""
+ atexit.register(self._clean_exit)
+
+ self.state = states.STARTING
+ self.log('Bus STARTING')
+ try:
+ self.publish('start')
+ self.state = states.STARTED
+ self.log('Bus STARTED')
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ self.log("Shutting down due to error in start listener:",
+ level=40, traceback=True)
+ e_info = sys.exc_info()
+ try:
+ self.exit()
+ except:
+ # Any stop/exit errors will be logged inside publish().
+ pass
+ raise e_info[0], e_info[1], e_info[2]
+
+ def exit(self):
+ """Stop all services and prepare to exit the process."""
+ exitstate = self.state
+ try:
+ self.stop()
+
+ self.state = states.EXITING
+ self.log('Bus EXITING')
+ self.publish('exit')
+ # This isn't strictly necessary, but it's better than seeing
+ # "Waiting for child threads to terminate..." and then nothing.
+ self.log('Bus EXITED')
+ except:
+ # This method is often called asynchronously (whether thread,
+ # signal handler, console handler, or atexit handler), so we
+ # can't just let exceptions propagate out unhandled.
+ # Assume it's been logged and just die.
+ os._exit(70) # EX_SOFTWARE
+
+ if exitstate == states.STARTING:
+ # exit() was called before start() finished, possibly due to
+ # Ctrl-C because a start listener got stuck. In this case,
+ # we could get stuck in a loop where Ctrl-C never exits the
+ # process, so we just call os.exit here.
+ os._exit(70) # EX_SOFTWARE
+
+ def restart(self):
+ """Restart the process (may close connections).
+
+ This method does not restart the process from the calling thread;
+ instead, it stops the bus and asks the main thread to call execv.
+ """
+ self.execv = True
+ self.exit()
+
+ def graceful(self):
+ """Advise all services to reload."""
+ self.log('Bus graceful')
+ self.publish('graceful')
+
+ def block(self, interval=0.1):
+ """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
+
+ This function is intended to be called only by the main thread.
+ After waiting for the EXITING state, it also waits for all threads
+ to terminate, and then calls os.execv if self.execv is True. This
+ design allows another thread to call bus.restart, yet have the main
+ thread perform the actual execv call (required on some platforms).
+ """
+ try:
+ self.wait(states.EXITING, interval=interval, channel='main')
+ except (KeyboardInterrupt, IOError):
+ # The time.sleep call might raise
+ # "IOError: [Errno 4] Interrupted function call" on KBInt.
+ self.log('Keyboard Interrupt: shutting down bus')
+ self.exit()
+ except SystemExit:
+ self.log('SystemExit raised: shutting down bus')
+ self.exit()
+ raise
+
+ # Waiting for ALL child threads to finish is necessary on OS X.
+ # See http://www.cherrypy.org/ticket/581.
+ # It's also good to let them all shut down before allowing
+ # the main thread to call atexit handlers.
+ # See http://www.cherrypy.org/ticket/751.
+ self.log("Waiting for child threads to terminate...")
+ for t in threading.enumerate():
+ if t != threading.currentThread() and t.isAlive():
+ # Note that any dummy (external) threads are always daemonic.
+ if hasattr(threading.Thread, "daemon"):
+ # Python 2.6+
+ d = t.daemon
+ else:
+ d = t.isDaemon()
+ if not d:
+ self.log("Waiting for thread %s." % t.getName())
+ t.join()
+
+ if self.execv:
+ self._do_execv()
+
+ def wait(self, state, interval=0.1, channel=None):
+ """Poll for the given state(s) at intervals; publish to channel."""
+ if isinstance(state, (tuple, list)):
+ states = state
+ else:
+ states = [state]
+
+ def _wait():
+ while self.state not in states:
+ time.sleep(interval)
+ self.publish(channel)
+
+ # From http://psyco.sourceforge.net/psycoguide/bugs.html:
+ # "The compiled machine code does not include the regular polling
+ # done by Python, meaning that a KeyboardInterrupt will not be
+ # detected before execution comes back to the regular Python
+ # interpreter. Your program cannot be interrupted if caught
+ # into an infinite Psyco-compiled loop."
+ try:
+ sys.modules['psyco'].cannotcompile(_wait)
+ except (KeyError, AttributeError):
+ pass
+
+ _wait()
+
+ def _do_execv(self):
+ """Re-execute the current process.
+
+ This must be called from the main thread, because certain platforms
+ (OS X) don't allow execv to be called in a child thread very well.
+ """
+ args = sys.argv[:]
+ self.log('Re-spawning %s' % ' '.join(args))
+
+ if sys.platform[:4] == 'java':
+ from _systemrestart import SystemRestart
+ raise SystemRestart
+ else:
+ args.insert(0, sys.executable)
+ if sys.platform == 'win32':
+ args = ['"%s"' % arg for arg in args]
+
+ os.chdir(_startup_cwd)
+ os.execv(sys.executable, args)
+
+ def stop(self):
+ """Stop all services."""
+ self.state = states.STOPPING
+ self.log('Bus STOPPING')
+ self.publish('stop')
+ self.state = states.STOPPED
+ self.log('Bus STOPPED')
+
+ def start_with_callback(self, func, args=None, kwargs=None):
+ """Start 'func' in a new thread T, then start self (and return T)."""
+ if args is None:
+ args = ()
+ if kwargs is None:
+ kwargs = {}
+ args = (func,) + args
+
+ def _callback(func, *a, **kw):
+ self.wait(states.STARTED)
+ func(*a, **kw)
+ t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
+ t.setName('Bus Callback ' + t.getName())
+ t.start()
+
+ self.start()
+
+ return t
+
+ def log(self, msg="", level=20, traceback=False):
+ """Log the given message. Append the last traceback if requested."""
+ if traceback:
+ exc = sys.exc_info()
+ msg += "\n" + "".join(_traceback.format_exception(*exc))
+ self.publish('log', msg, level)
+
+bus = Bus()