Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/test/helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/test/helper.py')
-rwxr-xr-xcherrypy/test/helper.py476
1 files changed, 476 insertions, 0 deletions
diff --git a/cherrypy/test/helper.py b/cherrypy/test/helper.py
new file mode 100755
index 0000000..ff9e06c
--- /dev/null
+++ b/cherrypy/test/helper.py
@@ -0,0 +1,476 @@
+"""A library of helper functions for the CherryPy test suite."""
+
+import datetime
+import logging
+log = logging.getLogger(__name__)
+import os
+thisdir = os.path.abspath(os.path.dirname(__file__))
+serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem')
+
+import re
+import sys
+import time
+import warnings
+
+import cherrypy
+from cherrypy._cpcompat import basestring, copyitems, HTTPSConnection, ntob
+from cherrypy.lib import httputil
+from cherrypy.lib.reprconf import unrepr
+from cherrypy.test import webtest
+
+import nose
+
+_testconfig = None
+
+def get_tst_config(overconf = {}):
+ global _testconfig
+ if _testconfig is None:
+ conf = {
+ 'scheme': 'http',
+ 'protocol': "HTTP/1.1",
+ 'port': 8080,
+ 'host': '127.0.0.1',
+ 'validate': False,
+ 'conquer': False,
+ 'server': 'wsgi',
+ }
+ try:
+ import testconfig
+ _conf = testconfig.config.get('supervisor', None)
+ if _conf is not None:
+ for k, v in _conf.items():
+ if isinstance(v, basestring):
+ _conf[k] = unrepr(v)
+ conf.update(_conf)
+ except ImportError:
+ pass
+ _testconfig = conf
+ conf = _testconfig.copy()
+ conf.update(overconf)
+
+ return conf
+
+class Supervisor(object):
+ """Base class for modeling and controlling servers during testing."""
+
+ def __init__(self, **kwargs):
+ for k, v in kwargs.items():
+ if k == 'port':
+ setattr(self, k, int(v))
+ setattr(self, k, v)
+
+
+log_to_stderr = lambda msg, level: sys.stderr.write(msg + os.linesep)
+
+class LocalSupervisor(Supervisor):
+ """Base class for modeling/controlling servers which run in the same process.
+
+ When the server side runs in a different process, start/stop can dump all
+ state between each test module easily. When the server side runs in the
+ same process as the client, however, we have to do a bit more work to ensure
+ config and mounted apps are reset between tests.
+ """
+
+ using_apache = False
+ using_wsgi = False
+
+ def __init__(self, **kwargs):
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
+ cherrypy.server.httpserver = self.httpserver_class
+
+ engine = cherrypy.engine
+ if hasattr(engine, "signal_handler"):
+ engine.signal_handler.subscribe()
+ if hasattr(engine, "console_control_handler"):
+ engine.console_control_handler.subscribe()
+ #engine.subscribe('log', log_to_stderr)
+
+ def start(self, modulename=None):
+ """Load and start the HTTP server."""
+ if modulename:
+ # Unhook httpserver so cherrypy.server.start() creates a new
+ # one (with config from setup_server, if declared).
+ cherrypy.server.httpserver = None
+
+ cherrypy.engine.start()
+
+ self.sync_apps()
+
+ def sync_apps(self):
+ """Tell the server about any apps which the setup functions mounted."""
+ pass
+
+ def stop(self):
+ td = getattr(self, 'teardown', None)
+ if td:
+ td()
+
+ cherrypy.engine.exit()
+
+ for name, server in copyitems(getattr(cherrypy, 'servers', {})):
+ server.unsubscribe()
+ del cherrypy.servers[name]
+
+
+class NativeServerSupervisor(LocalSupervisor):
+ """Server supervisor for the builtin HTTP server."""
+
+ httpserver_class = "cherrypy._cpnative_server.CPHTTPServer"
+ using_apache = False
+ using_wsgi = False
+
+ def __str__(self):
+ return "Builtin HTTP Server on %s:%s" % (self.host, self.port)
+
+
+class LocalWSGISupervisor(LocalSupervisor):
+ """Server supervisor for the builtin WSGI server."""
+
+ httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer"
+ using_apache = False
+ using_wsgi = True
+
+ def __str__(self):
+ return "Builtin WSGI Server on %s:%s" % (self.host, self.port)
+
+ def sync_apps(self):
+ """Hook a new WSGI app into the origin server."""
+ cherrypy.server.httpserver.wsgi_app = self.get_app()
+
+ def get_app(self, app=None):
+ """Obtain a new (decorated) WSGI app to hook into the origin server."""
+ if app is None:
+ app = cherrypy.tree
+
+ if self.conquer:
+ try:
+ import wsgiconq
+ except ImportError:
+ warnings.warn("Error importing wsgiconq. pyconquer will not run.")
+ else:
+ app = wsgiconq.WSGILogger(app, c_calls=True)
+
+ if self.validate:
+ try:
+ from wsgiref import validate
+ except ImportError:
+ warnings.warn("Error importing wsgiref. The validator will not run.")
+ else:
+ #wraps the app in the validator
+ app = validate.validator(app)
+
+ return app
+
+
+def get_cpmodpy_supervisor(**options):
+ from cherrypy.test import modpy
+ sup = modpy.ModPythonSupervisor(**options)
+ sup.template = modpy.conf_cpmodpy
+ return sup
+
+def get_modpygw_supervisor(**options):
+ from cherrypy.test import modpy
+ sup = modpy.ModPythonSupervisor(**options)
+ sup.template = modpy.conf_modpython_gateway
+ sup.using_wsgi = True
+ return sup
+
+def get_modwsgi_supervisor(**options):
+ from cherrypy.test import modwsgi
+ return modwsgi.ModWSGISupervisor(**options)
+
+def get_modfcgid_supervisor(**options):
+ from cherrypy.test import modfcgid
+ return modfcgid.ModFCGISupervisor(**options)
+
+def get_modfastcgi_supervisor(**options):
+ from cherrypy.test import modfastcgi
+ return modfastcgi.ModFCGISupervisor(**options)
+
+def get_wsgi_u_supervisor(**options):
+ cherrypy.server.wsgi_version = ('u', 0)
+ return LocalWSGISupervisor(**options)
+
+
+class CPWebCase(webtest.WebCase):
+
+ script_name = ""
+ scheme = "http"
+
+ available_servers = {'wsgi': LocalWSGISupervisor,
+ 'wsgi_u': get_wsgi_u_supervisor,
+ 'native': NativeServerSupervisor,
+ 'cpmodpy': get_cpmodpy_supervisor,
+ 'modpygw': get_modpygw_supervisor,
+ 'modwsgi': get_modwsgi_supervisor,
+ 'modfcgid': get_modfcgid_supervisor,
+ 'modfastcgi': get_modfastcgi_supervisor,
+ }
+ default_server = "wsgi"
+
+ def _setup_server(cls, supervisor, conf):
+ v = sys.version.split()[0]
+ log.info("Python version used to run this test script: %s" % v)
+ log.info("CherryPy version: %s" % cherrypy.__version__)
+ if supervisor.scheme == "https":
+ ssl = " (ssl)"
+ else:
+ ssl = ""
+ log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl))
+ log.info("PID: %s" % os.getpid())
+
+ cherrypy.server.using_apache = supervisor.using_apache
+ cherrypy.server.using_wsgi = supervisor.using_wsgi
+
+ if sys.platform[:4] == 'java':
+ cherrypy.config.update({'server.nodelay': False})
+
+ if isinstance(conf, basestring):
+ parser = cherrypy.lib.reprconf.Parser()
+ conf = parser.dict_from_file(conf).get('global', {})
+ else:
+ conf = conf or {}
+ baseconf = conf.copy()
+ baseconf.update({'server.socket_host': supervisor.host,
+ 'server.socket_port': supervisor.port,
+ 'server.protocol_version': supervisor.protocol,
+ 'environment': "test_suite",
+ })
+ if supervisor.scheme == "https":
+ #baseconf['server.ssl_module'] = 'builtin'
+ baseconf['server.ssl_certificate'] = serverpem
+ baseconf['server.ssl_private_key'] = serverpem
+
+ # helper must be imported lazily so the coverage tool
+ # can run against module-level statements within cherrypy.
+ # Also, we have to do "from cherrypy.test import helper",
+ # exactly like each test module does, because a relative import
+ # would stick a second instance of webtest in sys.modules,
+ # and we wouldn't be able to globally override the port anymore.
+ if supervisor.scheme == "https":
+ webtest.WebCase.HTTP_CONN = HTTPSConnection
+ return baseconf
+ _setup_server = classmethod(_setup_server)
+
+ def setup_class(cls):
+ ''
+ #Creates a server
+ conf = get_tst_config()
+ supervisor_factory = cls.available_servers.get(conf.get('server', 'wsgi'))
+ if supervisor_factory is None:
+ raise RuntimeError('Unknown server in config: %s' % conf['server'])
+ supervisor = supervisor_factory(**conf)
+
+ #Copied from "run_test_suite"
+ cherrypy.config.reset()
+ baseconf = cls._setup_server(supervisor, conf)
+ cherrypy.config.update(baseconf)
+ setup_client()
+
+ if hasattr(cls, 'setup_server'):
+ # Clear the cherrypy tree and clear the wsgi server so that
+ # it can be updated with the new root
+ cherrypy.tree = cherrypy._cptree.Tree()
+ cherrypy.server.httpserver = None
+ cls.setup_server()
+ supervisor.start(cls.__module__)
+
+ cls.supervisor = supervisor
+ setup_class = classmethod(setup_class)
+
+ def teardown_class(cls):
+ ''
+ if hasattr(cls, 'setup_server'):
+ cls.supervisor.stop()
+ teardown_class = classmethod(teardown_class)
+
+ def prefix(self):
+ return self.script_name.rstrip("/")
+
+ def base(self):
+ if ((self.scheme == "http" and self.PORT == 80) or
+ (self.scheme == "https" and self.PORT == 443)):
+ port = ""
+ else:
+ port = ":%s" % self.PORT
+
+ return "%s://%s%s%s" % (self.scheme, self.HOST, port,
+ self.script_name.rstrip("/"))
+
+ def exit(self):
+ sys.exit()
+
+ def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
+ """Open the url. Return status, headers, body."""
+ if self.script_name:
+ url = httputil.urljoin(self.script_name, url)
+ return webtest.WebCase.getPage(self, url, headers, method, body, protocol)
+
+ def skip(self, msg='skipped '):
+ raise nose.SkipTest(msg)
+
+ def assertErrorPage(self, status, message=None, pattern=''):
+ """Compare the response body with a built in error page.
+
+ The function will optionally look for the regexp pattern,
+ within the exception embedded in the error page."""
+
+ # This will never contain a traceback
+ page = cherrypy._cperror.get_error_page(status, message=message)
+
+ # First, test the response body without checking the traceback.
+ # Stick a match-all group (.*) in to grab the traceback.
+ esc = re.escape
+ epage = esc(page)
+ epage = epage.replace(esc('<pre id="traceback"></pre>'),
+ esc('<pre id="traceback">') + '(.*)' + esc('</pre>'))
+ m = re.match(ntob(epage, self.encoding), self.body, re.DOTALL)
+ if not m:
+ self._handlewebError('Error page does not match; expected:\n' + page)
+ return
+
+ # Now test the pattern against the traceback
+ if pattern is None:
+ # Special-case None to mean that there should be *no* traceback.
+ if m and m.group(1):
+ self._handlewebError('Error page contains traceback')
+ else:
+ if (m is None) or (
+ not re.search(ntob(re.escape(pattern), self.encoding),
+ m.group(1))):
+ msg = 'Error page does not contain %s in traceback'
+ self._handlewebError(msg % repr(pattern))
+
+ date_tolerance = 2
+
+ def assertEqualDates(self, dt1, dt2, seconds=None):
+ """Assert abs(dt1 - dt2) is within Y seconds."""
+ if seconds is None:
+ seconds = self.date_tolerance
+
+ if dt1 > dt2:
+ diff = dt1 - dt2
+ else:
+ diff = dt2 - dt1
+ if not diff < datetime.timedelta(seconds=seconds):
+ raise AssertionError('%r and %r are not within %r seconds.' %
+ (dt1, dt2, seconds))
+
+
+def setup_client():
+ """Set up the WebCase classes to match the server's socket settings."""
+ webtest.WebCase.PORT = cherrypy.server.socket_port
+ webtest.WebCase.HOST = cherrypy.server.socket_host
+ if cherrypy.server.ssl_certificate:
+ CPWebCase.scheme = 'https'
+
+# --------------------------- Spawning helpers --------------------------- #
+
+
+class CPProcess(object):
+
+ pid_file = os.path.join(thisdir, 'test.pid')
+ config_file = os.path.join(thisdir, 'test.conf')
+ config_template = """[global]
+server.socket_host: '%(host)s'
+server.socket_port: %(port)s
+checker.on: False
+log.screen: False
+log.error_file: r'%(error_log)s'
+log.access_file: r'%(access_log)s'
+%(ssl)s
+%(extra)s
+"""
+ error_log = os.path.join(thisdir, 'test.error.log')
+ access_log = os.path.join(thisdir, 'test.access.log')
+
+ def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
+ self.wait = wait
+ self.daemonize = daemonize
+ self.ssl = ssl
+ self.host = socket_host or cherrypy.server.socket_host
+ self.port = socket_port or cherrypy.server.socket_port
+
+ def write_conf(self, extra=""):
+ if self.ssl:
+ serverpem = os.path.join(thisdir, 'test.pem')
+ ssl = """
+server.ssl_certificate: r'%s'
+server.ssl_private_key: r'%s'
+""" % (serverpem, serverpem)
+ else:
+ ssl = ""
+
+ conf = self.config_template % {
+ 'host': self.host,
+ 'port': self.port,
+ 'error_log': self.error_log,
+ 'access_log': self.access_log,
+ 'ssl': ssl,
+ 'extra': extra,
+ }
+ f = open(self.config_file, 'wb')
+ f.write(ntob(conf, 'utf-8'))
+ f.close()
+
+ def start(self, imports=None):
+ """Start cherryd in a subprocess."""
+ cherrypy._cpserver.wait_for_free_port(self.host, self.port)
+
+ args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'),
+ '-c', self.config_file, '-p', self.pid_file]
+
+ if not isinstance(imports, (list, tuple)):
+ imports = [imports]
+ for i in imports:
+ if i:
+ args.append('-i')
+ args.append(i)
+
+ if self.daemonize:
+ args.append('-d')
+
+ env = os.environ.copy()
+ # Make sure we import the cherrypy package in which this module is defined.
+ grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
+ if env.get('PYTHONPATH', ''):
+ env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH']))
+ else:
+ env['PYTHONPATH'] = grandparentdir
+ if self.wait:
+ self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env)
+ else:
+ os.spawnve(os.P_NOWAIT, sys.executable, args, env)
+ cherrypy._cpserver.wait_for_occupied_port(self.host, self.port)
+
+ # Give the engine a wee bit more time to finish STARTING
+ if self.daemonize:
+ time.sleep(2)
+ else:
+ time.sleep(1)
+
+ def get_pid(self):
+ return int(open(self.pid_file, 'rb').read())
+
+ def join(self):
+ """Wait for the process to exit."""
+ try:
+ try:
+ # Mac, UNIX
+ os.wait()
+ except AttributeError:
+ # Windows
+ try:
+ pid = self.get_pid()
+ except IOError:
+ # Assume the subprocess deleted the pidfile on shutdown.
+ pass
+ else:
+ os.waitpid(pid, 0)
+ except OSError:
+ x = sys.exc_info()[1]
+ if x.args != (10, 'No child processes'):
+ raise
+