Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/process
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/process')
-rwxr-xr-xcherrypy/process/__init__.py14
-rwxr-xr-xcherrypy/process/plugins.py681
-rwxr-xr-xcherrypy/process/servers.py418
-rwxr-xr-xcherrypy/process/win32.py174
-rwxr-xr-xcherrypy/process/wspbus.py393
5 files changed, 1680 insertions, 0 deletions
diff --git a/cherrypy/process/__init__.py b/cherrypy/process/__init__.py
new file mode 100755
index 0000000..f15b123
--- /dev/null
+++ b/cherrypy/process/__init__.py
@@ -0,0 +1,14 @@
+"""Site container for an HTTP server.
+
+A Web Site Process Bus object is used to connect applications, servers,
+and frameworks with site-wide services such as daemonization, process
+reload, signal handling, drop privileges, PID file management, logging
+for all of these, and many more.
+
+The 'plugins' module defines a few abstract and concrete services for
+use with the bus. Some use tool-specific channels; see the documentation
+for each class.
+"""
+
+from cherrypy.process.wspbus import bus
+from cherrypy.process import plugins, servers
diff --git a/cherrypy/process/plugins.py b/cherrypy/process/plugins.py
new file mode 100755
index 0000000..488958e
--- /dev/null
+++ b/cherrypy/process/plugins.py
@@ -0,0 +1,681 @@
+"""Site services for use with a Web Site Process Bus."""
+
+import os
+import re
+import signal as _signal
+import sys
+import time
+import threading
+
+from cherrypy._cpcompat import basestring, get_daemon, get_thread_ident, ntob, set
+
+# _module__file__base is used by Autoreload to make
+# absolute any filenames retrieved from sys.modules which are not
+# already absolute paths. This is to work around Python's quirk
+# of importing the startup script and using a relative filename
+# for it in sys.modules.
+#
+# Autoreload examines sys.modules afresh every time it runs. If an application
+# changes the current directory by executing os.chdir(), then the next time
+# Autoreload runs, it will not be able to find any filenames which are
+# not absolute paths, because the current directory is not the same as when the
+# module was first imported. Autoreload will then wrongly conclude the file has
+# "changed", and initiate the shutdown/re-exec sequence.
+# See ticket #917.
+# For this workaround to have a decent probability of success, this module
+# needs to be imported as early as possible, before the app has much chance
+# to change the working directory.
+_module__file__base = os.getcwd()
+
+
+class SimplePlugin(object):
+ """Plugin base class which auto-subscribes methods for known channels."""
+
+ bus = None
+ """A :class:`Bus <cherrypy.process.wspbus.Bus>`, usually cherrypy.engine."""
+
+ def __init__(self, bus):
+ self.bus = bus
+
+ def subscribe(self):
+ """Register this object as a (multi-channel) listener on the bus."""
+ for channel in self.bus.listeners:
+ # Subscribe self.start, self.exit, etc. if present.
+ method = getattr(self, channel, None)
+ if method is not None:
+ self.bus.subscribe(channel, method)
+
+ def unsubscribe(self):
+ """Unregister this object as a listener on the bus."""
+ for channel in self.bus.listeners:
+ # Unsubscribe self.start, self.exit, etc. if present.
+ method = getattr(self, channel, None)
+ if method is not None:
+ self.bus.unsubscribe(channel, method)
+
+
+
+class SignalHandler(object):
+ """Register bus channels (and listeners) for system signals.
+
+ You can modify what signals your application listens for, and what it does
+ when it receives signals, by modifying :attr:`SignalHandler.handlers`,
+ a dict of {signal name: callback} pairs. The default set is::
+
+ handlers = {'SIGTERM': self.bus.exit,
+ 'SIGHUP': self.handle_SIGHUP,
+ 'SIGUSR1': self.bus.graceful,
+ }
+
+ The :func:`SignalHandler.handle_SIGHUP`` method calls
+ :func:`bus.restart()<cherrypy.process.wspbus.Bus.restart>`
+ if the process is daemonized, but
+ :func:`bus.exit()<cherrypy.process.wspbus.Bus.exit>`
+ if the process is attached to a TTY. This is because Unix window
+ managers tend to send SIGHUP to terminal windows when the user closes them.
+
+ Feel free to add signals which are not available on every platform. The
+ :class:`SignalHandler` will ignore errors raised from attempting to register
+ handlers for unknown signals.
+ """
+
+ handlers = {}
+ """A map from signal names (e.g. 'SIGTERM') to handlers (e.g. bus.exit)."""
+
+ signals = {}
+ """A map from signal numbers to names."""
+
+ for k, v in vars(_signal).items():
+ if k.startswith('SIG') and not k.startswith('SIG_'):
+ signals[v] = k
+ del k, v
+
+ def __init__(self, bus):
+ self.bus = bus
+ # Set default handlers
+ self.handlers = {'SIGTERM': self.bus.exit,
+ 'SIGHUP': self.handle_SIGHUP,
+ 'SIGUSR1': self.bus.graceful,
+ }
+
+ if sys.platform[:4] == 'java':
+ del self.handlers['SIGUSR1']
+ self.handlers['SIGUSR2'] = self.bus.graceful
+ self.bus.log("SIGUSR1 cannot be set on the JVM platform. "
+ "Using SIGUSR2 instead.")
+ self.handlers['SIGINT'] = self._jython_SIGINT_handler
+
+ self._previous_handlers = {}
+
+ def _jython_SIGINT_handler(self, signum=None, frame=None):
+ # See http://bugs.jython.org/issue1313
+ self.bus.log('Keyboard Interrupt: shutting down bus')
+ self.bus.exit()
+
+ def subscribe(self):
+ """Subscribe self.handlers to signals."""
+ for sig, func in self.handlers.items():
+ try:
+ self.set_handler(sig, func)
+ except ValueError:
+ pass
+
+ def unsubscribe(self):
+ """Unsubscribe self.handlers from signals."""
+ for signum, handler in self._previous_handlers.items():
+ signame = self.signals[signum]
+
+ if handler is None:
+ self.bus.log("Restoring %s handler to SIG_DFL." % signame)
+ handler = _signal.SIG_DFL
+ else:
+ self.bus.log("Restoring %s handler %r." % (signame, handler))
+
+ try:
+ our_handler = _signal.signal(signum, handler)
+ if our_handler is None:
+ self.bus.log("Restored old %s handler %r, but our "
+ "handler was not registered." %
+ (signame, handler), level=30)
+ except ValueError:
+ self.bus.log("Unable to restore %s handler %r." %
+ (signame, handler), level=40, traceback=True)
+
+ def set_handler(self, signal, listener=None):
+ """Subscribe a handler for the given signal (number or name).
+
+ If the optional 'listener' argument is provided, it will be
+ subscribed as a listener for the given signal's channel.
+
+ If the given signal name or number is not available on the current
+ platform, ValueError is raised.
+ """
+ if isinstance(signal, basestring):
+ signum = getattr(_signal, signal, None)
+ if signum is None:
+ raise ValueError("No such signal: %r" % signal)
+ signame = signal
+ else:
+ try:
+ signame = self.signals[signal]
+ except KeyError:
+ raise ValueError("No such signal: %r" % signal)
+ signum = signal
+
+ prev = _signal.signal(signum, self._handle_signal)
+ self._previous_handlers[signum] = prev
+
+ if listener is not None:
+ self.bus.log("Listening for %s." % signame)
+ self.bus.subscribe(signame, listener)
+
+ def _handle_signal(self, signum=None, frame=None):
+ """Python signal handler (self.set_handler subscribes it for you)."""
+ signame = self.signals[signum]
+ self.bus.log("Caught signal %s." % signame)
+ self.bus.publish(signame)
+
+ def handle_SIGHUP(self):
+ """Restart if daemonized, else exit."""
+ if os.isatty(sys.stdin.fileno()):
+ # not daemonized (may be foreground or background)
+ self.bus.log("SIGHUP caught but not daemonized. Exiting.")
+ self.bus.exit()
+ else:
+ self.bus.log("SIGHUP caught while daemonized. Restarting.")
+ self.bus.restart()
+
+
+try:
+ import pwd, grp
+except ImportError:
+ pwd, grp = None, None
+
+
+class DropPrivileges(SimplePlugin):
+ """Drop privileges. uid/gid arguments not available on Windows.
+
+ Special thanks to Gavin Baker: http://antonym.org/node/100.
+ """
+
+ def __init__(self, bus, umask=None, uid=None, gid=None):
+ SimplePlugin.__init__(self, bus)
+ self.finalized = False
+ self.uid = uid
+ self.gid = gid
+ self.umask = umask
+
+ def _get_uid(self):
+ return self._uid
+ def _set_uid(self, val):
+ if val is not None:
+ if pwd is None:
+ self.bus.log("pwd module not available; ignoring uid.",
+ level=30)
+ val = None
+ elif isinstance(val, basestring):
+ val = pwd.getpwnam(val)[2]
+ self._uid = val
+ uid = property(_get_uid, _set_uid,
+ doc="The uid under which to run. Availability: Unix.")
+
+ def _get_gid(self):
+ return self._gid
+ def _set_gid(self, val):
+ if val is not None:
+ if grp is None:
+ self.bus.log("grp module not available; ignoring gid.",
+ level=30)
+ val = None
+ elif isinstance(val, basestring):
+ val = grp.getgrnam(val)[2]
+ self._gid = val
+ gid = property(_get_gid, _set_gid,
+ doc="The gid under which to run. Availability: Unix.")
+
+ def _get_umask(self):
+ return self._umask
+ def _set_umask(self, val):
+ if val is not None:
+ try:
+ os.umask
+ except AttributeError:
+ self.bus.log("umask function not available; ignoring umask.",
+ level=30)
+ val = None
+ self._umask = val
+ umask = property(_get_umask, _set_umask,
+ doc="""The default permission mode for newly created files and directories.
+
+ Usually expressed in octal format, for example, ``0644``.
+ Availability: Unix, Windows.
+ """)
+
+ def start(self):
+ # uid/gid
+ def current_ids():
+ """Return the current (uid, gid) if available."""
+ name, group = None, None
+ if pwd:
+ name = pwd.getpwuid(os.getuid())[0]
+ if grp:
+ group = grp.getgrgid(os.getgid())[0]
+ return name, group
+
+ if self.finalized:
+ if not (self.uid is None and self.gid is None):
+ self.bus.log('Already running as uid: %r gid: %r' %
+ current_ids())
+ else:
+ if self.uid is None and self.gid is None:
+ if pwd or grp:
+ self.bus.log('uid/gid not set', level=30)
+ else:
+ self.bus.log('Started as uid: %r gid: %r' % current_ids())
+ if self.gid is not None:
+ os.setgid(self.gid)
+ os.setgroups([])
+ if self.uid is not None:
+ os.setuid(self.uid)
+ self.bus.log('Running as uid: %r gid: %r' % current_ids())
+
+ # umask
+ if self.finalized:
+ if self.umask is not None:
+ self.bus.log('umask already set to: %03o' % self.umask)
+ else:
+ if self.umask is None:
+ self.bus.log('umask not set', level=30)
+ else:
+ old_umask = os.umask(self.umask)
+ self.bus.log('umask old: %03o, new: %03o' %
+ (old_umask, self.umask))
+
+ self.finalized = True
+ # This is slightly higher than the priority for server.start
+ # in order to facilitate the most common use: starting on a low
+ # port (which requires root) and then dropping to another user.
+ start.priority = 77
+
+
+class Daemonizer(SimplePlugin):
+ """Daemonize the running script.
+
+ Use this with a Web Site Process Bus via::
+
+ Daemonizer(bus).subscribe()
+
+ When this component finishes, the process is completely decoupled from
+ the parent environment. Please note that when this component is used,
+ the return code from the parent process will still be 0 if a startup
+ error occurs in the forked children. Errors in the initial daemonizing
+ process still return proper exit codes. Therefore, if you use this
+ plugin to daemonize, don't use the return code as an accurate indicator
+ of whether the process fully started. In fact, that return code only
+ indicates if the process succesfully finished the first fork.
+ """
+
+ def __init__(self, bus, stdin='/dev/null', stdout='/dev/null',
+ stderr='/dev/null'):
+ SimplePlugin.__init__(self, bus)
+ self.stdin = stdin
+ self.stdout = stdout
+ self.stderr = stderr
+ self.finalized = False
+
+ def start(self):
+ if self.finalized:
+ self.bus.log('Already deamonized.')
+
+ # forking has issues with threads:
+ # http://www.opengroup.org/onlinepubs/000095399/functions/fork.html
+ # "The general problem with making fork() work in a multi-threaded
+ # world is what to do with all of the threads..."
+ # So we check for active threads:
+ if threading.activeCount() != 1:
+ self.bus.log('There are %r active threads. '
+ 'Daemonizing now may cause strange failures.' %
+ threading.enumerate(), level=30)
+
+ # See http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
+ # (or http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7)
+ # and http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
+
+ # Finish up with the current stdout/stderr
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ # Do first fork.
+ try:
+ pid = os.fork()
+ if pid == 0:
+ # This is the child process. Continue.
+ pass
+ else:
+ # This is the first parent. Exit, now that we've forked.
+ self.bus.log('Forking once.')
+ os._exit(0)
+ except OSError:
+ # Python raises OSError rather than returning negative numbers.
+ exc = sys.exc_info()[1]
+ sys.exit("%s: fork #1 failed: (%d) %s\n"
+ % (sys.argv[0], exc.errno, exc.strerror))
+
+ os.setsid()
+
+ # Do second fork
+ try:
+ pid = os.fork()
+ if pid > 0:
+ self.bus.log('Forking twice.')
+ os._exit(0) # Exit second parent
+ except OSError:
+ exc = sys.exc_info()[1]
+ sys.exit("%s: fork #2 failed: (%d) %s\n"
+ % (sys.argv[0], exc.errno, exc.strerror))
+
+ os.chdir("/")
+ os.umask(0)
+
+ si = open(self.stdin, "r")
+ so = open(self.stdout, "a+")
+ se = open(self.stderr, "a+")
+
+ # os.dup2(fd, fd2) will close fd2 if necessary,
+ # so we don't explicitly close stdin/out/err.
+ # See http://docs.python.org/lib/os-fd-ops.html
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ self.bus.log('Daemonized to PID: %s' % os.getpid())
+ self.finalized = True
+ start.priority = 65
+
+
+class PIDFile(SimplePlugin):
+ """Maintain a PID file via a WSPBus."""
+
+ def __init__(self, bus, pidfile):
+ SimplePlugin.__init__(self, bus)
+ self.pidfile = pidfile
+ self.finalized = False
+
+ def start(self):
+ pid = os.getpid()
+ if self.finalized:
+ self.bus.log('PID %r already written to %r.' % (pid, self.pidfile))
+ else:
+ open(self.pidfile, "wb").write(ntob("%s" % pid, 'utf8'))
+ self.bus.log('PID %r written to %r.' % (pid, self.pidfile))
+ self.finalized = True
+ start.priority = 70
+
+ def exit(self):
+ try:
+ os.remove(self.pidfile)
+ self.bus.log('PID file removed: %r.' % self.pidfile)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ pass
+
+
+class PerpetualTimer(threading._Timer):
+ """A responsive subclass of threading._Timer whose run() method repeats.
+
+ Use this timer only when you really need a very interruptible timer;
+ this checks its 'finished' condition up to 20 times a second, which can
+ results in pretty high CPU usage
+ """
+
+ def run(self):
+ while True:
+ self.finished.wait(self.interval)
+ if self.finished.isSet():
+ return
+ try:
+ self.function(*self.args, **self.kwargs)
+ except Exception:
+ self.bus.log("Error in perpetual timer thread function %r." %
+ self.function, level=40, traceback=True)
+ # Quit on first error to avoid massive logs.
+ raise
+
+
+class BackgroundTask(threading.Thread):
+ """A subclass of threading.Thread whose run() method repeats.
+
+ Use this class for most repeating tasks. It uses time.sleep() to wait
+ for each interval, which isn't very responsive; that is, even if you call
+ self.cancel(), you'll have to wait until the sleep() call finishes before
+ the thread stops. To compensate, it defaults to being daemonic, which means
+ it won't delay stopping the whole process.
+ """
+
+ def __init__(self, interval, function, args=[], kwargs={}):
+ threading.Thread.__init__(self)
+ self.interval = interval
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+ self.running = False
+
+ def cancel(self):
+ self.running = False
+
+ def run(self):
+ self.running = True
+ while self.running:
+ time.sleep(self.interval)
+ if not self.running:
+ return
+ try:
+ self.function(*self.args, **self.kwargs)
+ except Exception:
+ self.bus.log("Error in background task thread function %r." %
+ self.function, level=40, traceback=True)
+ # Quit on first error to avoid massive logs.
+ raise
+
+ def _set_daemon(self):
+ return True
+
+
+class Monitor(SimplePlugin):
+ """WSPBus listener to periodically run a callback in its own thread."""
+
+ callback = None
+ """The function to call at intervals."""
+
+ frequency = 60
+ """The time in seconds between callback runs."""
+
+ thread = None
+ """A :class:`BackgroundTask<cherrypy.process.plugins.BackgroundTask>` thread."""
+
+ def __init__(self, bus, callback, frequency=60, name=None):
+ SimplePlugin.__init__(self, bus)
+ self.callback = callback
+ self.frequency = frequency
+ self.thread = None
+ self.name = name
+
+ def start(self):
+ """Start our callback in its own background thread."""
+ if self.frequency > 0:
+ threadname = self.name or self.__class__.__name__
+ if self.thread is None:
+ self.thread = BackgroundTask(self.frequency, self.callback)
+ self.thread.bus = self.bus
+ self.thread.setName(threadname)
+ self.thread.start()
+ self.bus.log("Started monitor thread %r." % threadname)
+ else:
+ self.bus.log("Monitor thread %r already started." % threadname)
+ start.priority = 70
+
+ def stop(self):
+ """Stop our callback's background task thread."""
+ if self.thread is None:
+ self.bus.log("No thread running for %s." % self.name or self.__class__.__name__)
+ else:
+ if self.thread is not threading.currentThread():
+ name = self.thread.getName()
+ self.thread.cancel()
+ if not get_daemon(self.thread):
+ self.bus.log("Joining %r" % name)
+ self.thread.join()
+ self.bus.log("Stopped thread %r." % name)
+ self.thread = None
+
+ def graceful(self):
+ """Stop the callback's background task thread and restart it."""
+ self.stop()
+ self.start()
+
+
+class Autoreloader(Monitor):
+ """Monitor which re-executes the process when files change.
+
+ This :ref:`plugin<plugins>` restarts the process (via :func:`os.execv`)
+ if any of the files it monitors change (or is deleted). By default, the
+ autoreloader monitors all imported modules; you can add to the
+ set by adding to ``autoreload.files``::
+
+ cherrypy.engine.autoreload.files.add(myFile)
+
+ If there are imported files you do *not* wish to monitor, you can adjust the
+ ``match`` attribute, a regular expression. For example, to stop monitoring
+ cherrypy itself::
+
+ cherrypy.engine.autoreload.match = r'^(?!cherrypy).+'
+
+ Like all :class:`Monitor<cherrypy.process.plugins.Monitor>` plugins,
+ the autoreload plugin takes a ``frequency`` argument. The default is
+ 1 second; that is, the autoreloader will examine files once each second.
+ """
+
+ files = None
+ """The set of files to poll for modifications."""
+
+ frequency = 1
+ """The interval in seconds at which to poll for modified files."""
+
+ match = '.*'
+ """A regular expression by which to match filenames."""
+
+ def __init__(self, bus, frequency=1, match='.*'):
+ self.mtimes = {}
+ self.files = set()
+ self.match = match
+ Monitor.__init__(self, bus, self.run, frequency)
+
+ def start(self):
+ """Start our own background task thread for self.run."""
+ if self.thread is None:
+ self.mtimes = {}
+ Monitor.start(self)
+ start.priority = 70
+
+ def sysfiles(self):
+ """Return a Set of sys.modules filenames to monitor."""
+ files = set()
+ for k, m in sys.modules.items():
+ if re.match(self.match, k):
+ if hasattr(m, '__loader__') and hasattr(m.__loader__, 'archive'):
+ f = m.__loader__.archive
+ else:
+ f = getattr(m, '__file__', None)
+ if f is not None and not os.path.isabs(f):
+ # ensure absolute paths so a os.chdir() in the app doesn't break me
+ f = os.path.normpath(os.path.join(_module__file__base, f))
+ files.add(f)
+ return files
+
+ def run(self):
+ """Reload the process if registered files have been modified."""
+ for filename in self.sysfiles() | self.files:
+ if filename:
+ if filename.endswith('.pyc'):
+ filename = filename[:-1]
+
+ oldtime = self.mtimes.get(filename, 0)
+ if oldtime is None:
+ # Module with no .py file. Skip it.
+ continue
+
+ try:
+ mtime = os.stat(filename).st_mtime
+ except OSError:
+ # Either a module with no .py file, or it's been deleted.
+ mtime = None
+
+ if filename not in self.mtimes:
+ # If a module has no .py file, this will be None.
+ self.mtimes[filename] = mtime
+ else:
+ if mtime is None or mtime > oldtime:
+ # The file has been deleted or modified.
+ self.bus.log("Restarting because %s changed." % filename)
+ self.thread.cancel()
+ self.bus.log("Stopped thread %r." % self.thread.getName())
+ self.bus.restart()
+ return
+
+
+class ThreadManager(SimplePlugin):
+ """Manager for HTTP request threads.
+
+ If you have control over thread creation and destruction, publish to
+ the 'acquire_thread' and 'release_thread' channels (for each thread).
+ This will register/unregister the current thread and publish to
+ 'start_thread' and 'stop_thread' listeners in the bus as needed.
+
+ If threads are created and destroyed by code you do not control
+ (e.g., Apache), then, at the beginning of every HTTP request,
+ publish to 'acquire_thread' only. You should not publish to
+ 'release_thread' in this case, since you do not know whether
+ the thread will be re-used or not. The bus will call
+ 'stop_thread' listeners for you when it stops.
+ """
+
+ threads = None
+ """A map of {thread ident: index number} pairs."""
+
+ def __init__(self, bus):
+ self.threads = {}
+ SimplePlugin.__init__(self, bus)
+ self.bus.listeners.setdefault('acquire_thread', set())
+ self.bus.listeners.setdefault('start_thread', set())
+ self.bus.listeners.setdefault('release_thread', set())
+ self.bus.listeners.setdefault('stop_thread', set())
+
+ def acquire_thread(self):
+ """Run 'start_thread' listeners for the current thread.
+
+ If the current thread has already been seen, any 'start_thread'
+ listeners will not be run again.
+ """
+ thread_ident = get_thread_ident()
+ if thread_ident not in self.threads:
+ # We can't just use get_ident as the thread ID
+ # because some platforms reuse thread ID's.
+ i = len(self.threads) + 1
+ self.threads[thread_ident] = i
+ self.bus.publish('start_thread', i)
+
+ def release_thread(self):
+ """Release the current thread and run 'stop_thread' listeners."""
+ thread_ident = get_thread_ident()
+ i = self.threads.pop(thread_ident, None)
+ if i is not None:
+ self.bus.publish('stop_thread', i)
+
+ def stop(self):
+ """Release all threads and run all 'stop_thread' listeners."""
+ for thread_ident, i in self.threads.items():
+ self.bus.publish('stop_thread', i)
+ self.threads.clear()
+ graceful = stop
+
diff --git a/cherrypy/process/servers.py b/cherrypy/process/servers.py
new file mode 100755
index 0000000..272e843
--- /dev/null
+++ b/cherrypy/process/servers.py
@@ -0,0 +1,418 @@
+"""
+Starting in CherryPy 3.1, cherrypy.server is implemented as an
+:ref:`Engine Plugin<plugins>`. It's an instance of
+:class:`cherrypy._cpserver.Server`, which is a subclass of
+:class:`cherrypy.process.servers.ServerAdapter`. The ``ServerAdapter`` class
+is designed to control other servers, as well.
+
+Multiple servers/ports
+======================
+
+If you need to start more than one HTTP server (to serve on multiple ports, or
+protocols, etc.), you can manually register each one and then start them all
+with engine.start::
+
+ s1 = ServerAdapter(cherrypy.engine, MyWSGIServer(host='0.0.0.0', port=80))
+ s2 = ServerAdapter(cherrypy.engine, another.HTTPServer(host='127.0.0.1', SSL=True))
+ s1.subscribe()
+ s2.subscribe()
+ cherrypy.engine.start()
+
+.. index:: SCGI
+
+FastCGI/SCGI
+============
+
+There are also Flup\ **F**\ CGIServer and Flup\ **S**\ CGIServer classes in
+:mod:`cherrypy.process.servers`. To start an fcgi server, for example,
+wrap an instance of it in a ServerAdapter::
+
+ addr = ('0.0.0.0', 4000)
+ f = servers.FlupFCGIServer(application=cherrypy.tree, bindAddress=addr)
+ s = servers.ServerAdapter(cherrypy.engine, httpserver=f, bind_addr=addr)
+ s.subscribe()
+
+The :doc:`cherryd</deployguide/cherryd>` startup script will do the above for
+you via its `-f` flag.
+Note that you need to download and install `flup <http://trac.saddi.com/flup>`_
+yourself, whether you use ``cherryd`` or not.
+
+.. _fastcgi:
+.. index:: FastCGI
+
+FastCGI
+-------
+
+A very simple setup lets your cherry run with FastCGI.
+You just need the flup library,
+plus a running Apache server (with ``mod_fastcgi``) or lighttpd server.
+
+CherryPy code
+^^^^^^^^^^^^^
+
+hello.py::
+
+ #!/usr/bin/python
+ import cherrypy
+
+ class HelloWorld:
+ \"""Sample request handler class.\"""
+ def index(self):
+ return "Hello world!"
+ index.exposed = True
+
+ cherrypy.tree.mount(HelloWorld())
+ # CherryPy autoreload must be disabled for the flup server to work
+ cherrypy.config.update({'engine.autoreload_on':False})
+
+Then run :doc:`/deployguide/cherryd` with the '-f' arg::
+
+ cherryd -c <myconfig> -d -f -i hello.py
+
+Apache
+^^^^^^
+
+At the top level in httpd.conf::
+
+ FastCgiIpcDir /tmp
+ FastCgiServer /path/to/cherry.fcgi -idle-timeout 120 -processes 4
+
+And inside the relevant VirtualHost section::
+
+ # FastCGI config
+ AddHandler fastcgi-script .fcgi
+ ScriptAliasMatch (.*$) /path/to/cherry.fcgi$1
+
+Lighttpd
+^^^^^^^^
+
+For `Lighttpd <http://www.lighttpd.net/>`_ you can follow these
+instructions. Within ``lighttpd.conf`` make sure ``mod_fastcgi`` is
+active within ``server.modules``. Then, within your ``$HTTP["host"]``
+directive, configure your fastcgi script like the following::
+
+ $HTTP["url"] =~ "" {
+ fastcgi.server = (
+ "/" => (
+ "script.fcgi" => (
+ "bin-path" => "/path/to/your/script.fcgi",
+ "socket" => "/tmp/script.sock",
+ "check-local" => "disable",
+ "disable-time" => 1,
+ "min-procs" => 1,
+ "max-procs" => 1, # adjust as needed
+ ),
+ ),
+ )
+ } # end of $HTTP["url"] =~ "^/"
+
+Please see `Lighttpd FastCGI Docs
+<http://redmine.lighttpd.net/wiki/lighttpd/Docs:ModFastCGI>`_ for an explanation
+of the possible configuration options.
+"""
+
+import sys
+import time
+
+
+class ServerAdapter(object):
+ """Adapter for an HTTP server.
+
+ If you need to start more than one HTTP server (to serve on multiple
+ ports, or protocols, etc.), you can manually register each one and then
+ start them all with bus.start:
+
+ s1 = ServerAdapter(bus, MyWSGIServer(host='0.0.0.0', port=80))
+ s2 = ServerAdapter(bus, another.HTTPServer(host='127.0.0.1', SSL=True))
+ s1.subscribe()
+ s2.subscribe()
+ bus.start()
+ """
+
+ def __init__(self, bus, httpserver=None, bind_addr=None):
+ self.bus = bus
+ self.httpserver = httpserver
+ self.bind_addr = bind_addr
+ self.interrupt = None
+ self.running = False
+
+ def subscribe(self):
+ self.bus.subscribe('start', self.start)
+ self.bus.subscribe('stop', self.stop)
+
+ def unsubscribe(self):
+ self.bus.unsubscribe('start', self.start)
+ self.bus.unsubscribe('stop', self.stop)
+
+ def start(self):
+ """Start the HTTP server."""
+ if self.bind_addr is None:
+ on_what = "unknown interface (dynamic?)"
+ elif isinstance(self.bind_addr, tuple):
+ host, port = self.bind_addr
+ on_what = "%s:%s" % (host, port)
+ else:
+ on_what = "socket file: %s" % self.bind_addr
+
+ if self.running:
+ self.bus.log("Already serving on %s" % on_what)
+ return
+
+ self.interrupt = None
+ if not self.httpserver:
+ raise ValueError("No HTTP server has been created.")
+
+ # Start the httpserver in a new thread.
+ if isinstance(self.bind_addr, tuple):
+ wait_for_free_port(*self.bind_addr)
+
+ import threading
+ t = threading.Thread(target=self._start_http_thread)
+ t.setName("HTTPServer " + t.getName())
+ t.start()
+
+ self.wait()
+ self.running = True
+ self.bus.log("Serving on %s" % on_what)
+ start.priority = 75
+
+ def _start_http_thread(self):
+ """HTTP servers MUST be running in new threads, so that the
+ main thread persists to receive KeyboardInterrupt's. If an
+ exception is raised in the httpserver's thread then it's
+ trapped here, and the bus (and therefore our httpserver)
+ are shut down.
+ """
+ try:
+ self.httpserver.start()
+ except KeyboardInterrupt:
+ self.bus.log("<Ctrl-C> hit: shutting down HTTP server")
+ self.interrupt = sys.exc_info()[1]
+ self.bus.exit()
+ except SystemExit:
+ self.bus.log("SystemExit raised: shutting down HTTP server")
+ self.interrupt = sys.exc_info()[1]
+ self.bus.exit()
+ raise
+ except:
+ self.interrupt = sys.exc_info()[1]
+ self.bus.log("Error in HTTP server: shutting down",
+ traceback=True, level=40)
+ self.bus.exit()
+ raise
+
+ def wait(self):
+ """Wait until the HTTP server is ready to receive requests."""
+ while not getattr(self.httpserver, "ready", False):
+ if self.interrupt:
+ raise self.interrupt
+ time.sleep(.1)
+
+ # Wait for port to be occupied
+ if isinstance(self.bind_addr, tuple):
+ host, port = self.bind_addr
+ wait_for_occupied_port(host, port)
+
+ def stop(self):
+ """Stop the HTTP server."""
+ if self.running:
+ # stop() MUST block until the server is *truly* stopped.
+ self.httpserver.stop()
+ # Wait for the socket to be truly freed.
+ if isinstance(self.bind_addr, tuple):
+ wait_for_free_port(*self.bind_addr)
+ self.running = False
+ self.bus.log("HTTP Server %s shut down" % self.httpserver)
+ else:
+ self.bus.log("HTTP Server %s already shut down" % self.httpserver)
+ stop.priority = 25
+
+ def restart(self):
+ """Restart the HTTP server."""
+ self.stop()
+ self.start()
+
+
+class FlupCGIServer(object):
+ """Adapter for a flup.server.cgi.WSGIServer."""
+
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+ self.ready = False
+
+ def start(self):
+ """Start the CGI server."""
+ # We have to instantiate the server class here because its __init__
+ # starts a threadpool. If we do it too early, daemonize won't work.
+ from flup.server.cgi import WSGIServer
+
+ self.cgiserver = WSGIServer(*self.args, **self.kwargs)
+ self.ready = True
+ self.cgiserver.run()
+
+ def stop(self):
+ """Stop the HTTP server."""
+ self.ready = False
+
+
+class FlupFCGIServer(object):
+ """Adapter for a flup.server.fcgi.WSGIServer."""
+
+ def __init__(self, *args, **kwargs):
+ if kwargs.get('bindAddress', None) is None:
+ import socket
+ if not hasattr(socket, 'fromfd'):
+ raise ValueError(
+ 'Dynamic FCGI server not available on this platform. '
+ 'You must use a static or external one by providing a '
+ 'legal bindAddress.')
+ self.args = args
+ self.kwargs = kwargs
+ self.ready = False
+
+ def start(self):
+ """Start the FCGI server."""
+ # We have to instantiate the server class here because its __init__
+ # starts a threadpool. If we do it too early, daemonize won't work.
+ from flup.server.fcgi import WSGIServer
+ self.fcgiserver = WSGIServer(*self.args, **self.kwargs)
+ # TODO: report this bug upstream to flup.
+ # If we don't set _oldSIGs on Windows, we get:
+ # File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+ # line 108, in run
+ # self._restoreSignalHandlers()
+ # File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+ # line 156, in _restoreSignalHandlers
+ # for signum,handler in self._oldSIGs:
+ # AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
+ self.fcgiserver._installSignalHandlers = lambda: None
+ self.fcgiserver._oldSIGs = []
+ self.ready = True
+ self.fcgiserver.run()
+
+ def stop(self):
+ """Stop the HTTP server."""
+ # Forcibly stop the fcgi server main event loop.
+ self.fcgiserver._keepGoing = False
+ # Force all worker threads to die off.
+ self.fcgiserver._threadPool.maxSpare = self.fcgiserver._threadPool._idleCount
+ self.ready = False
+
+
+class FlupSCGIServer(object):
+ """Adapter for a flup.server.scgi.WSGIServer."""
+
+ def __init__(self, *args, **kwargs):
+ self.args = args
+ self.kwargs = kwargs
+ self.ready = False
+
+ def start(self):
+ """Start the SCGI server."""
+ # We have to instantiate the server class here because its __init__
+ # starts a threadpool. If we do it too early, daemonize won't work.
+ from flup.server.scgi import WSGIServer
+ self.scgiserver = WSGIServer(*self.args, **self.kwargs)
+ # TODO: report this bug upstream to flup.
+ # If we don't set _oldSIGs on Windows, we get:
+ # File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+ # line 108, in run
+ # self._restoreSignalHandlers()
+ # File "C:\Python24\Lib\site-packages\flup\server\threadedserver.py",
+ # line 156, in _restoreSignalHandlers
+ # for signum,handler in self._oldSIGs:
+ # AttributeError: 'WSGIServer' object has no attribute '_oldSIGs'
+ self.scgiserver._installSignalHandlers = lambda: None
+ self.scgiserver._oldSIGs = []
+ self.ready = True
+ self.scgiserver.run()
+
+ def stop(self):
+ """Stop the HTTP server."""
+ self.ready = False
+ # Forcibly stop the scgi server main event loop.
+ self.scgiserver._keepGoing = False
+ # Force all worker threads to die off.
+ self.scgiserver._threadPool.maxSpare = 0
+
+
+def client_host(server_host):
+ """Return the host on which a client can connect to the given listener."""
+ if server_host == '0.0.0.0':
+ # 0.0.0.0 is INADDR_ANY, which should answer on localhost.
+ return '127.0.0.1'
+ if server_host in ('::', '::0', '::0.0.0.0'):
+ # :: is IN6ADDR_ANY, which should answer on localhost.
+ # ::0 and ::0.0.0.0 are non-canonical but common ways to write IN6ADDR_ANY.
+ return '::1'
+ return server_host
+
+def check_port(host, port, timeout=1.0):
+ """Raise an error if the given port is not free on the given host."""
+ if not host:
+ raise ValueError("Host values of '' or None are not allowed.")
+ host = client_host(host)
+ port = int(port)
+
+ import socket
+
+ # AF_INET or AF_INET6 socket
+ # Get the correct address family for our host (allows IPv6 addresses)
+ try:
+ info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM)
+ except socket.gaierror:
+ if ':' in host:
+ info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
+ else:
+ info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]
+
+ for res in info:
+ af, socktype, proto, canonname, sa = res
+ s = None
+ try:
+ s = socket.socket(af, socktype, proto)
+ # See http://groups.google.com/group/cherrypy-users/
+ # browse_frm/thread/bbfe5eb39c904fe0
+ s.settimeout(timeout)
+ s.connect((host, port))
+ s.close()
+ raise IOError("Port %s is in use on %s; perhaps the previous "
+ "httpserver did not shut down properly." %
+ (repr(port), repr(host)))
+ except socket.error:
+ if s:
+ s.close()
+
+def wait_for_free_port(host, port):
+ """Wait for the specified port to become free (drop requests)."""
+ if not host:
+ raise ValueError("Host values of '' or None are not allowed.")
+
+ for trial in range(50):
+ try:
+ # we are expecting a free port, so reduce the timeout
+ check_port(host, port, timeout=0.1)
+ except IOError:
+ # Give the old server thread time to free the port.
+ time.sleep(0.1)
+ else:
+ return
+
+ raise IOError("Port %r not free on %r" % (port, host))
+
+def wait_for_occupied_port(host, port):
+ """Wait for the specified port to become active (receive requests)."""
+ if not host:
+ raise ValueError("Host values of '' or None are not allowed.")
+
+ for trial in range(50):
+ try:
+ check_port(host, port)
+ except IOError:
+ return
+ else:
+ time.sleep(.1)
+
+ raise IOError("Port %r not bound on %r" % (port, host))
diff --git a/cherrypy/process/win32.py b/cherrypy/process/win32.py
new file mode 100755
index 0000000..83f99a5
--- /dev/null
+++ b/cherrypy/process/win32.py
@@ -0,0 +1,174 @@
+"""Windows service. Requires pywin32."""
+
+import os
+import win32api
+import win32con
+import win32event
+import win32service
+import win32serviceutil
+
+from cherrypy.process import wspbus, plugins
+
+
+class ConsoleCtrlHandler(plugins.SimplePlugin):
+ """A WSPBus plugin for handling Win32 console events (like Ctrl-C)."""
+
+ def __init__(self, bus):
+ self.is_set = False
+ plugins.SimplePlugin.__init__(self, bus)
+
+ def start(self):
+ if self.is_set:
+ self.bus.log('Handler for console events already set.', level=40)
+ return
+
+ result = win32api.SetConsoleCtrlHandler(self.handle, 1)
+ if result == 0:
+ self.bus.log('Could not SetConsoleCtrlHandler (error %r)' %
+ win32api.GetLastError(), level=40)
+ else:
+ self.bus.log('Set handler for console events.', level=40)
+ self.is_set = True
+
+ def stop(self):
+ if not self.is_set:
+ self.bus.log('Handler for console events already off.', level=40)
+ return
+
+ try:
+ result = win32api.SetConsoleCtrlHandler(self.handle, 0)
+ except ValueError:
+ # "ValueError: The object has not been registered"
+ result = 1
+
+ if result == 0:
+ self.bus.log('Could not remove SetConsoleCtrlHandler (error %r)' %
+ win32api.GetLastError(), level=40)
+ else:
+ self.bus.log('Removed handler for console events.', level=40)
+ self.is_set = False
+
+ def handle(self, event):
+ """Handle console control events (like Ctrl-C)."""
+ if event in (win32con.CTRL_C_EVENT, win32con.CTRL_LOGOFF_EVENT,
+ win32con.CTRL_BREAK_EVENT, win32con.CTRL_SHUTDOWN_EVENT,
+ win32con.CTRL_CLOSE_EVENT):
+ self.bus.log('Console event %s: shutting down bus' % event)
+
+ # Remove self immediately so repeated Ctrl-C doesn't re-call it.
+ try:
+ self.stop()
+ except ValueError:
+ pass
+
+ self.bus.exit()
+ # 'First to return True stops the calls'
+ return 1
+ return 0
+
+
+class Win32Bus(wspbus.Bus):
+ """A Web Site Process Bus implementation for Win32.
+
+ Instead of time.sleep, this bus blocks using native win32event objects.
+ """
+
+ def __init__(self):
+ self.events = {}
+ wspbus.Bus.__init__(self)
+
+ def _get_state_event(self, state):
+ """Return a win32event for the given state (creating it if needed)."""
+ try:
+ return self.events[state]
+ except KeyError:
+ event = win32event.CreateEvent(None, 0, 0,
+ "WSPBus %s Event (pid=%r)" %
+ (state.name, os.getpid()))
+ self.events[state] = event
+ return event
+
+ def _get_state(self):
+ return self._state
+ def _set_state(self, value):
+ self._state = value
+ event = self._get_state_event(value)
+ win32event.PulseEvent(event)
+ state = property(_get_state, _set_state)
+
+ def wait(self, state, interval=0.1, channel=None):
+ """Wait for the given state(s), KeyboardInterrupt or SystemExit.
+
+ Since this class uses native win32event objects, the interval
+ argument is ignored.
+ """
+ if isinstance(state, (tuple, list)):
+ # Don't wait for an event that beat us to the punch ;)
+ if self.state not in state:
+ events = tuple([self._get_state_event(s) for s in state])
+ win32event.WaitForMultipleObjects(events, 0, win32event.INFINITE)
+ else:
+ # Don't wait for an event that beat us to the punch ;)
+ if self.state != state:
+ event = self._get_state_event(state)
+ win32event.WaitForSingleObject(event, win32event.INFINITE)
+
+
+class _ControlCodes(dict):
+ """Control codes used to "signal" a service via ControlService.
+
+ User-defined control codes are in the range 128-255. We generally use
+ the standard Python value for the Linux signal and add 128. Example:
+
+ >>> signal.SIGUSR1
+ 10
+ control_codes['graceful'] = 128 + 10
+ """
+
+ def key_for(self, obj):
+ """For the given value, return its corresponding key."""
+ for key, val in self.items():
+ if val is obj:
+ return key
+ raise ValueError("The given object could not be found: %r" % obj)
+
+control_codes = _ControlCodes({'graceful': 138})
+
+
+def signal_child(service, command):
+ if command == 'stop':
+ win32serviceutil.StopService(service)
+ elif command == 'restart':
+ win32serviceutil.RestartService(service)
+ else:
+ win32serviceutil.ControlService(service, control_codes[command])
+
+
+class PyWebService(win32serviceutil.ServiceFramework):
+ """Python Web Service."""
+
+ _svc_name_ = "Python Web Service"
+ _svc_display_name_ = "Python Web Service"
+ _svc_deps_ = None # sequence of service names on which this depends
+ _exe_name_ = "pywebsvc"
+ _exe_args_ = None # Default to no arguments
+
+ # Only exists on Windows 2000 or later, ignored on windows NT
+ _svc_description_ = "Python Web Service"
+
+ def SvcDoRun(self):
+ from cherrypy import process
+ process.bus.start()
+ process.bus.block()
+
+ def SvcStop(self):
+ from cherrypy import process
+ self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
+ process.bus.exit()
+
+ def SvcOther(self, control):
+ process.bus.publish(control_codes.key_for(control))
+
+
+if __name__ == '__main__':
+ win32serviceutil.HandleCommandLine(PyWebService)
diff --git a/cherrypy/process/wspbus.py b/cherrypy/process/wspbus.py
new file mode 100755
index 0000000..46cd75a
--- /dev/null
+++ b/cherrypy/process/wspbus.py
@@ -0,0 +1,393 @@
+"""An implementation of the Web Site Process Bus.
+
+This module is completely standalone, depending only on the stdlib.
+
+Web Site Process Bus
+--------------------
+
+A Bus object is used to contain and manage site-wide behavior:
+daemonization, HTTP server start/stop, process reload, signal handling,
+drop privileges, PID file management, logging for all of these,
+and many more.
+
+In addition, a Bus object provides a place for each web framework
+to register code that runs in response to site-wide events (like
+process start and stop), or which controls or otherwise interacts with
+the site-wide components mentioned above. For example, a framework which
+uses file-based templates would add known template filenames to an
+autoreload component.
+
+Ideally, a Bus object will be flexible enough to be useful in a variety
+of invocation scenarios:
+
+ 1. The deployer starts a site from the command line via a
+ framework-neutral deployment script; applications from multiple frameworks
+ are mixed in a single site. Command-line arguments and configuration
+ files are used to define site-wide components such as the HTTP server,
+ WSGI component graph, autoreload behavior, signal handling, etc.
+ 2. The deployer starts a site via some other process, such as Apache;
+ applications from multiple frameworks are mixed in a single site.
+ Autoreload and signal handling (from Python at least) are disabled.
+ 3. The deployer starts a site via a framework-specific mechanism;
+ for example, when running tests, exploring tutorials, or deploying
+ single applications from a single framework. The framework controls
+ which site-wide components are enabled as it sees fit.
+
+The Bus object in this package uses topic-based publish-subscribe
+messaging to accomplish all this. A few topic channels are built in
+('start', 'stop', 'exit', 'graceful', 'log', and 'main'). Frameworks and
+site containers are free to define their own. If a message is sent to a
+channel that has not been defined or has no listeners, there is no effect.
+
+In general, there should only ever be a single Bus object per process.
+Frameworks and site containers share a single Bus object by publishing
+messages and subscribing listeners.
+
+The Bus object works as a finite state machine which models the current
+state of the process. Bus methods move it from one state to another;
+those methods then publish to subscribed listeners on the channel for
+the new state.::
+
+ O
+ |
+ V
+ STOPPING --> STOPPED --> EXITING -> X
+ A A |
+ | \___ |
+ | \ |
+ | V V
+ STARTED <-- STARTING
+
+"""
+
+import atexit
+import os
+import sys
+import threading
+import time
+import traceback as _traceback
+import warnings
+
+from cherrypy._cpcompat import set
+
+# Here I save the value of os.getcwd(), which, if I am imported early enough,
+# will be the directory from which the startup script was run. This is needed
+# by _do_execv(), to change back to the original directory before execv()ing a
+# new process. This is a defense against the application having changed the
+# current working directory (which could make sys.executable "not found" if
+# sys.executable is a relative-path, and/or cause other problems).
+_startup_cwd = os.getcwd()
+
+class ChannelFailures(Exception):
+ """Exception raised when errors occur in a listener during Bus.publish()."""
+ delimiter = '\n'
+
+ def __init__(self, *args, **kwargs):
+ # Don't use 'super' here; Exceptions are old-style in Py2.4
+ # See http://www.cherrypy.org/ticket/959
+ Exception.__init__(self, *args, **kwargs)
+ self._exceptions = list()
+
+ def handle_exception(self):
+ """Append the current exception to self."""
+ self._exceptions.append(sys.exc_info())
+
+ def get_instances(self):
+ """Return a list of seen exception instances."""
+ return [instance for cls, instance, traceback in self._exceptions]
+
+ def __str__(self):
+ exception_strings = map(repr, self.get_instances())
+ return self.delimiter.join(exception_strings)
+
+ __repr__ = __str__
+
+ def __nonzero__(self):
+ return bool(self._exceptions)
+
+# Use a flag to indicate the state of the bus.
+class _StateEnum(object):
+ class State(object):
+ name = None
+ def __repr__(self):
+ return "states.%s" % self.name
+
+ def __setattr__(self, key, value):
+ if isinstance(value, self.State):
+ value.name = key
+ object.__setattr__(self, key, value)
+states = _StateEnum()
+states.STOPPED = states.State()
+states.STARTING = states.State()
+states.STARTED = states.State()
+states.STOPPING = states.State()
+states.EXITING = states.State()
+
+
+class Bus(object):
+ """Process state-machine and messenger for HTTP site deployment.
+
+ All listeners for a given channel are guaranteed to be called even
+ if others at the same channel fail. Each failure is logged, but
+ execution proceeds on to the next listener. The only way to stop all
+ processing from inside a listener is to raise SystemExit and stop the
+ whole server.
+ """
+
+ states = states
+ state = states.STOPPED
+ execv = False
+
+ def __init__(self):
+ self.execv = False
+ self.state = states.STOPPED
+ self.listeners = dict(
+ [(channel, set()) for channel
+ in ('start', 'stop', 'exit', 'graceful', 'log', 'main')])
+ self._priorities = {}
+
+ def subscribe(self, channel, callback, priority=None):
+ """Add the given callback at the given channel (if not present)."""
+ if channel not in self.listeners:
+ self.listeners[channel] = set()
+ self.listeners[channel].add(callback)
+
+ if priority is None:
+ priority = getattr(callback, 'priority', 50)
+ self._priorities[(channel, callback)] = priority
+
+ def unsubscribe(self, channel, callback):
+ """Discard the given callback (if present)."""
+ listeners = self.listeners.get(channel)
+ if listeners and callback in listeners:
+ listeners.discard(callback)
+ del self._priorities[(channel, callback)]
+
+ def publish(self, channel, *args, **kwargs):
+ """Return output of all subscribers for the given channel."""
+ if channel not in self.listeners:
+ return []
+
+ exc = ChannelFailures()
+ output = []
+
+ items = [(self._priorities[(channel, listener)], listener)
+ for listener in self.listeners[channel]]
+ items.sort()
+ for priority, listener in items:
+ try:
+ output.append(listener(*args, **kwargs))
+ except KeyboardInterrupt:
+ raise
+ except SystemExit, e:
+ # If we have previous errors ensure the exit code is non-zero
+ if exc and e.code == 0:
+ e.code = 1
+ raise
+ except:
+ exc.handle_exception()
+ if channel == 'log':
+ # Assume any further messages to 'log' will fail.
+ pass
+ else:
+ self.log("Error in %r listener %r" % (channel, listener),
+ level=40, traceback=True)
+ if exc:
+ raise exc
+ return output
+
+ def _clean_exit(self):
+ """An atexit handler which asserts the Bus is not running."""
+ if self.state != states.EXITING:
+ warnings.warn(
+ "The main thread is exiting, but the Bus is in the %r state; "
+ "shutting it down automatically now. You must either call "
+ "bus.block() after start(), or call bus.exit() before the "
+ "main thread exits." % self.state, RuntimeWarning)
+ self.exit()
+
+ def start(self):
+ """Start all services."""
+ atexit.register(self._clean_exit)
+
+ self.state = states.STARTING
+ self.log('Bus STARTING')
+ try:
+ self.publish('start')
+ self.state = states.STARTED
+ self.log('Bus STARTED')
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ self.log("Shutting down due to error in start listener:",
+ level=40, traceback=True)
+ e_info = sys.exc_info()
+ try:
+ self.exit()
+ except:
+ # Any stop/exit errors will be logged inside publish().
+ pass
+ raise e_info[0], e_info[1], e_info[2]
+
+ def exit(self):
+ """Stop all services and prepare to exit the process."""
+ exitstate = self.state
+ try:
+ self.stop()
+
+ self.state = states.EXITING
+ self.log('Bus EXITING')
+ self.publish('exit')
+ # This isn't strictly necessary, but it's better than seeing
+ # "Waiting for child threads to terminate..." and then nothing.
+ self.log('Bus EXITED')
+ except:
+ # This method is often called asynchronously (whether thread,
+ # signal handler, console handler, or atexit handler), so we
+ # can't just let exceptions propagate out unhandled.
+ # Assume it's been logged and just die.
+ os._exit(70) # EX_SOFTWARE
+
+ if exitstate == states.STARTING:
+ # exit() was called before start() finished, possibly due to
+ # Ctrl-C because a start listener got stuck. In this case,
+ # we could get stuck in a loop where Ctrl-C never exits the
+ # process, so we just call os.exit here.
+ os._exit(70) # EX_SOFTWARE
+
+ def restart(self):
+ """Restart the process (may close connections).
+
+ This method does not restart the process from the calling thread;
+ instead, it stops the bus and asks the main thread to call execv.
+ """
+ self.execv = True
+ self.exit()
+
+ def graceful(self):
+ """Advise all services to reload."""
+ self.log('Bus graceful')
+ self.publish('graceful')
+
+ def block(self, interval=0.1):
+ """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
+
+ This function is intended to be called only by the main thread.
+ After waiting for the EXITING state, it also waits for all threads
+ to terminate, and then calls os.execv if self.execv is True. This
+ design allows another thread to call bus.restart, yet have the main
+ thread perform the actual execv call (required on some platforms).
+ """
+ try:
+ self.wait(states.EXITING, interval=interval, channel='main')
+ except (KeyboardInterrupt, IOError):
+ # The time.sleep call might raise
+ # "IOError: [Errno 4] Interrupted function call" on KBInt.
+ self.log('Keyboard Interrupt: shutting down bus')
+ self.exit()
+ except SystemExit:
+ self.log('SystemExit raised: shutting down bus')
+ self.exit()
+ raise
+
+ # Waiting for ALL child threads to finish is necessary on OS X.
+ # See http://www.cherrypy.org/ticket/581.
+ # It's also good to let them all shut down before allowing
+ # the main thread to call atexit handlers.
+ # See http://www.cherrypy.org/ticket/751.
+ self.log("Waiting for child threads to terminate...")
+ for t in threading.enumerate():
+ if t != threading.currentThread() and t.isAlive():
+ # Note that any dummy (external) threads are always daemonic.
+ if hasattr(threading.Thread, "daemon"):
+ # Python 2.6+
+ d = t.daemon
+ else:
+ d = t.isDaemon()
+ if not d:
+ self.log("Waiting for thread %s." % t.getName())
+ t.join()
+
+ if self.execv:
+ self._do_execv()
+
+ def wait(self, state, interval=0.1, channel=None):
+ """Poll for the given state(s) at intervals; publish to channel."""
+ if isinstance(state, (tuple, list)):
+ states = state
+ else:
+ states = [state]
+
+ def _wait():
+ while self.state not in states:
+ time.sleep(interval)
+ self.publish(channel)
+
+ # From http://psyco.sourceforge.net/psycoguide/bugs.html:
+ # "The compiled machine code does not include the regular polling
+ # done by Python, meaning that a KeyboardInterrupt will not be
+ # detected before execution comes back to the regular Python
+ # interpreter. Your program cannot be interrupted if caught
+ # into an infinite Psyco-compiled loop."
+ try:
+ sys.modules['psyco'].cannotcompile(_wait)
+ except (KeyError, AttributeError):
+ pass
+
+ _wait()
+
+ def _do_execv(self):
+ """Re-execute the current process.
+
+ This must be called from the main thread, because certain platforms
+ (OS X) don't allow execv to be called in a child thread very well.
+ """
+ args = sys.argv[:]
+ self.log('Re-spawning %s' % ' '.join(args))
+
+ if sys.platform[:4] == 'java':
+ from _systemrestart import SystemRestart
+ raise SystemRestart
+ else:
+ args.insert(0, sys.executable)
+ if sys.platform == 'win32':
+ args = ['"%s"' % arg for arg in args]
+
+ os.chdir(_startup_cwd)
+ os.execv(sys.executable, args)
+
+ def stop(self):
+ """Stop all services."""
+ self.state = states.STOPPING
+ self.log('Bus STOPPING')
+ self.publish('stop')
+ self.state = states.STOPPED
+ self.log('Bus STOPPED')
+
+ def start_with_callback(self, func, args=None, kwargs=None):
+ """Start 'func' in a new thread T, then start self (and return T)."""
+ if args is None:
+ args = ()
+ if kwargs is None:
+ kwargs = {}
+ args = (func,) + args
+
+ def _callback(func, *a, **kw):
+ self.wait(states.STARTED)
+ func(*a, **kw)
+ t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
+ t.setName('Bus Callback ' + t.getName())
+ t.start()
+
+ self.start()
+
+ return t
+
+ def log(self, msg="", level=20, traceback=False):
+ """Log the given message. Append the last traceback if requested."""
+ if traceback:
+ exc = sys.exc_info()
+ msg += "\n" + "".join(_traceback.format_exception(*exc))
+ self.publish('log', msg, level)
+
+bus = Bus()