diff options
Diffstat (limited to 'cherrypy/test/helper.py')
-rwxr-xr-x | cherrypy/test/helper.py | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/cherrypy/test/helper.py b/cherrypy/test/helper.py new file mode 100755 index 0000000..ff9e06c --- /dev/null +++ b/cherrypy/test/helper.py @@ -0,0 +1,476 @@ +"""A library of helper functions for the CherryPy test suite.""" + +import datetime +import logging +log = logging.getLogger(__name__) +import os +thisdir = os.path.abspath(os.path.dirname(__file__)) +serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem') + +import re +import sys +import time +import warnings + +import cherrypy +from cherrypy._cpcompat import basestring, copyitems, HTTPSConnection, ntob +from cherrypy.lib import httputil +from cherrypy.lib.reprconf import unrepr +from cherrypy.test import webtest + +import nose + +_testconfig = None + +def get_tst_config(overconf = {}): + global _testconfig + if _testconfig is None: + conf = { + 'scheme': 'http', + 'protocol': "HTTP/1.1", + 'port': 8080, + 'host': '127.0.0.1', + 'validate': False, + 'conquer': False, + 'server': 'wsgi', + } + try: + import testconfig + _conf = testconfig.config.get('supervisor', None) + if _conf is not None: + for k, v in _conf.items(): + if isinstance(v, basestring): + _conf[k] = unrepr(v) + conf.update(_conf) + except ImportError: + pass + _testconfig = conf + conf = _testconfig.copy() + conf.update(overconf) + + return conf + +class Supervisor(object): + """Base class for modeling and controlling servers during testing.""" + + def __init__(self, **kwargs): + for k, v in kwargs.items(): + if k == 'port': + setattr(self, k, int(v)) + setattr(self, k, v) + + +log_to_stderr = lambda msg, level: sys.stderr.write(msg + os.linesep) + +class LocalSupervisor(Supervisor): + """Base class for modeling/controlling servers which run in the same process. + + When the server side runs in a different process, start/stop can dump all + state between each test module easily. When the server side runs in the + same process as the client, however, we have to do a bit more work to ensure + config and mounted apps are reset between tests. + """ + + using_apache = False + using_wsgi = False + + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + cherrypy.server.httpserver = self.httpserver_class + + engine = cherrypy.engine + if hasattr(engine, "signal_handler"): + engine.signal_handler.subscribe() + if hasattr(engine, "console_control_handler"): + engine.console_control_handler.subscribe() + #engine.subscribe('log', log_to_stderr) + + def start(self, modulename=None): + """Load and start the HTTP server.""" + if modulename: + # Unhook httpserver so cherrypy.server.start() creates a new + # one (with config from setup_server, if declared). + cherrypy.server.httpserver = None + + cherrypy.engine.start() + + self.sync_apps() + + def sync_apps(self): + """Tell the server about any apps which the setup functions mounted.""" + pass + + def stop(self): + td = getattr(self, 'teardown', None) + if td: + td() + + cherrypy.engine.exit() + + for name, server in copyitems(getattr(cherrypy, 'servers', {})): + server.unsubscribe() + del cherrypy.servers[name] + + +class NativeServerSupervisor(LocalSupervisor): + """Server supervisor for the builtin HTTP server.""" + + httpserver_class = "cherrypy._cpnative_server.CPHTTPServer" + using_apache = False + using_wsgi = False + + def __str__(self): + return "Builtin HTTP Server on %s:%s" % (self.host, self.port) + + +class LocalWSGISupervisor(LocalSupervisor): + """Server supervisor for the builtin WSGI server.""" + + httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer" + using_apache = False + using_wsgi = True + + def __str__(self): + return "Builtin WSGI Server on %s:%s" % (self.host, self.port) + + def sync_apps(self): + """Hook a new WSGI app into the origin server.""" + cherrypy.server.httpserver.wsgi_app = self.get_app() + + def get_app(self, app=None): + """Obtain a new (decorated) WSGI app to hook into the origin server.""" + if app is None: + app = cherrypy.tree + + if self.conquer: + try: + import wsgiconq + except ImportError: + warnings.warn("Error importing wsgiconq. pyconquer will not run.") + else: + app = wsgiconq.WSGILogger(app, c_calls=True) + + if self.validate: + try: + from wsgiref import validate + except ImportError: + warnings.warn("Error importing wsgiref. The validator will not run.") + else: + #wraps the app in the validator + app = validate.validator(app) + + return app + + +def get_cpmodpy_supervisor(**options): + from cherrypy.test import modpy + sup = modpy.ModPythonSupervisor(**options) + sup.template = modpy.conf_cpmodpy + return sup + +def get_modpygw_supervisor(**options): + from cherrypy.test import modpy + sup = modpy.ModPythonSupervisor(**options) + sup.template = modpy.conf_modpython_gateway + sup.using_wsgi = True + return sup + +def get_modwsgi_supervisor(**options): + from cherrypy.test import modwsgi + return modwsgi.ModWSGISupervisor(**options) + +def get_modfcgid_supervisor(**options): + from cherrypy.test import modfcgid + return modfcgid.ModFCGISupervisor(**options) + +def get_modfastcgi_supervisor(**options): + from cherrypy.test import modfastcgi + return modfastcgi.ModFCGISupervisor(**options) + +def get_wsgi_u_supervisor(**options): + cherrypy.server.wsgi_version = ('u', 0) + return LocalWSGISupervisor(**options) + + +class CPWebCase(webtest.WebCase): + + script_name = "" + scheme = "http" + + available_servers = {'wsgi': LocalWSGISupervisor, + 'wsgi_u': get_wsgi_u_supervisor, + 'native': NativeServerSupervisor, + 'cpmodpy': get_cpmodpy_supervisor, + 'modpygw': get_modpygw_supervisor, + 'modwsgi': get_modwsgi_supervisor, + 'modfcgid': get_modfcgid_supervisor, + 'modfastcgi': get_modfastcgi_supervisor, + } + default_server = "wsgi" + + def _setup_server(cls, supervisor, conf): + v = sys.version.split()[0] + log.info("Python version used to run this test script: %s" % v) + log.info("CherryPy version: %s" % cherrypy.__version__) + if supervisor.scheme == "https": + ssl = " (ssl)" + else: + ssl = "" + log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl)) + log.info("PID: %s" % os.getpid()) + + cherrypy.server.using_apache = supervisor.using_apache + cherrypy.server.using_wsgi = supervisor.using_wsgi + + if sys.platform[:4] == 'java': + cherrypy.config.update({'server.nodelay': False}) + + if isinstance(conf, basestring): + parser = cherrypy.lib.reprconf.Parser() + conf = parser.dict_from_file(conf).get('global', {}) + else: + conf = conf or {} + baseconf = conf.copy() + baseconf.update({'server.socket_host': supervisor.host, + 'server.socket_port': supervisor.port, + 'server.protocol_version': supervisor.protocol, + 'environment': "test_suite", + }) + if supervisor.scheme == "https": + #baseconf['server.ssl_module'] = 'builtin' + baseconf['server.ssl_certificate'] = serverpem + baseconf['server.ssl_private_key'] = serverpem + + # helper must be imported lazily so the coverage tool + # can run against module-level statements within cherrypy. + # Also, we have to do "from cherrypy.test import helper", + # exactly like each test module does, because a relative import + # would stick a second instance of webtest in sys.modules, + # and we wouldn't be able to globally override the port anymore. + if supervisor.scheme == "https": + webtest.WebCase.HTTP_CONN = HTTPSConnection + return baseconf + _setup_server = classmethod(_setup_server) + + def setup_class(cls): + '' + #Creates a server + conf = get_tst_config() + supervisor_factory = cls.available_servers.get(conf.get('server', 'wsgi')) + if supervisor_factory is None: + raise RuntimeError('Unknown server in config: %s' % conf['server']) + supervisor = supervisor_factory(**conf) + + #Copied from "run_test_suite" + cherrypy.config.reset() + baseconf = cls._setup_server(supervisor, conf) + cherrypy.config.update(baseconf) + setup_client() + + if hasattr(cls, 'setup_server'): + # Clear the cherrypy tree and clear the wsgi server so that + # it can be updated with the new root + cherrypy.tree = cherrypy._cptree.Tree() + cherrypy.server.httpserver = None + cls.setup_server() + supervisor.start(cls.__module__) + + cls.supervisor = supervisor + setup_class = classmethod(setup_class) + + def teardown_class(cls): + '' + if hasattr(cls, 'setup_server'): + cls.supervisor.stop() + teardown_class = classmethod(teardown_class) + + def prefix(self): + return self.script_name.rstrip("/") + + def base(self): + if ((self.scheme == "http" and self.PORT == 80) or + (self.scheme == "https" and self.PORT == 443)): + port = "" + else: + port = ":%s" % self.PORT + + return "%s://%s%s%s" % (self.scheme, self.HOST, port, + self.script_name.rstrip("/")) + + def exit(self): + sys.exit() + + def getPage(self, url, headers=None, method="GET", body=None, protocol=None): + """Open the url. Return status, headers, body.""" + if self.script_name: + url = httputil.urljoin(self.script_name, url) + return webtest.WebCase.getPage(self, url, headers, method, body, protocol) + + def skip(self, msg='skipped '): + raise nose.SkipTest(msg) + + def assertErrorPage(self, status, message=None, pattern=''): + """Compare the response body with a built in error page. + + The function will optionally look for the regexp pattern, + within the exception embedded in the error page.""" + + # This will never contain a traceback + page = cherrypy._cperror.get_error_page(status, message=message) + + # First, test the response body without checking the traceback. + # Stick a match-all group (.*) in to grab the traceback. + esc = re.escape + epage = esc(page) + epage = epage.replace(esc('<pre id="traceback"></pre>'), + esc('<pre id="traceback">') + '(.*)' + esc('</pre>')) + m = re.match(ntob(epage, self.encoding), self.body, re.DOTALL) + if not m: + self._handlewebError('Error page does not match; expected:\n' + page) + return + + # Now test the pattern against the traceback + if pattern is None: + # Special-case None to mean that there should be *no* traceback. + if m and m.group(1): + self._handlewebError('Error page contains traceback') + else: + if (m is None) or ( + not re.search(ntob(re.escape(pattern), self.encoding), + m.group(1))): + msg = 'Error page does not contain %s in traceback' + self._handlewebError(msg % repr(pattern)) + + date_tolerance = 2 + + def assertEqualDates(self, dt1, dt2, seconds=None): + """Assert abs(dt1 - dt2) is within Y seconds.""" + if seconds is None: + seconds = self.date_tolerance + + if dt1 > dt2: + diff = dt1 - dt2 + else: + diff = dt2 - dt1 + if not diff < datetime.timedelta(seconds=seconds): + raise AssertionError('%r and %r are not within %r seconds.' % + (dt1, dt2, seconds)) + + +def setup_client(): + """Set up the WebCase classes to match the server's socket settings.""" + webtest.WebCase.PORT = cherrypy.server.socket_port + webtest.WebCase.HOST = cherrypy.server.socket_host + if cherrypy.server.ssl_certificate: + CPWebCase.scheme = 'https' + +# --------------------------- Spawning helpers --------------------------- # + + +class CPProcess(object): + + pid_file = os.path.join(thisdir, 'test.pid') + config_file = os.path.join(thisdir, 'test.conf') + config_template = """[global] +server.socket_host: '%(host)s' +server.socket_port: %(port)s +checker.on: False +log.screen: False +log.error_file: r'%(error_log)s' +log.access_file: r'%(access_log)s' +%(ssl)s +%(extra)s +""" + error_log = os.path.join(thisdir, 'test.error.log') + access_log = os.path.join(thisdir, 'test.access.log') + + def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None): + self.wait = wait + self.daemonize = daemonize + self.ssl = ssl + self.host = socket_host or cherrypy.server.socket_host + self.port = socket_port or cherrypy.server.socket_port + + def write_conf(self, extra=""): + if self.ssl: + serverpem = os.path.join(thisdir, 'test.pem') + ssl = """ +server.ssl_certificate: r'%s' +server.ssl_private_key: r'%s' +""" % (serverpem, serverpem) + else: + ssl = "" + + conf = self.config_template % { + 'host': self.host, + 'port': self.port, + 'error_log': self.error_log, + 'access_log': self.access_log, + 'ssl': ssl, + 'extra': extra, + } + f = open(self.config_file, 'wb') + f.write(ntob(conf, 'utf-8')) + f.close() + + def start(self, imports=None): + """Start cherryd in a subprocess.""" + cherrypy._cpserver.wait_for_free_port(self.host, self.port) + + args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'), + '-c', self.config_file, '-p', self.pid_file] + + if not isinstance(imports, (list, tuple)): + imports = [imports] + for i in imports: + if i: + args.append('-i') + args.append(i) + + if self.daemonize: + args.append('-d') + + env = os.environ.copy() + # Make sure we import the cherrypy package in which this module is defined. + grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..')) + if env.get('PYTHONPATH', ''): + env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH'])) + else: + env['PYTHONPATH'] = grandparentdir + if self.wait: + self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env) + else: + os.spawnve(os.P_NOWAIT, sys.executable, args, env) + cherrypy._cpserver.wait_for_occupied_port(self.host, self.port) + + # Give the engine a wee bit more time to finish STARTING + if self.daemonize: + time.sleep(2) + else: + time.sleep(1) + + def get_pid(self): + return int(open(self.pid_file, 'rb').read()) + + def join(self): + """Wait for the process to exit.""" + try: + try: + # Mac, UNIX + os.wait() + except AttributeError: + # Windows + try: + pid = self.get_pid() + except IOError: + # Assume the subprocess deleted the pidfile on shutdown. + pass + else: + os.waitpid(pid, 0) + except OSError: + x = sys.exc_info()[1] + if x.args != (10, 'No child processes'): + raise + |