diff options
Diffstat (limited to 'cherrypy/test/webtest.py')
-rwxr-xr-x | cherrypy/test/webtest.py | 535 |
1 files changed, 0 insertions, 535 deletions
diff --git a/cherrypy/test/webtest.py b/cherrypy/test/webtest.py deleted file mode 100755 index 969eab0..0000000 --- a/cherrypy/test/webtest.py +++ /dev/null @@ -1,535 +0,0 @@ -"""Extensions to unittest for web frameworks. - -Use the WebCase.getPage method to request a page from your HTTP server. - -Framework Integration -===================== - -If you have control over your server process, you can handle errors -in the server-side of the HTTP conversation a bit better. You must run -both the client (your WebCase tests) and the server in the same process -(but in separate threads, obviously). - -When an error occurs in the framework, call server_error. It will print -the traceback to stdout, and keep any assertions you have from running -(the assumption is that, if the server errors, the page output will not -be of further significance to your tests). -""" - -import os -import pprint -import re -import socket -import sys -import time -import traceback -import types - -from unittest import * -from unittest import _TextTestResult - -from cherrypy._cpcompat import basestring, HTTPConnection, HTTPSConnection, unicodestr - - - -def interface(host): - """Return an IP address for a client connection given the server host. - - If the server is listening on '0.0.0.0' (INADDR_ANY) - or '::' (IN6ADDR_ANY), this will return the proper localhost.""" - if host == '0.0.0.0': - # INADDR_ANY, which should respond on localhost. - return "127.0.0.1" - if host == '::': - # IN6ADDR_ANY, which should respond on localhost. - return "::1" - return host - - -class TerseTestResult(_TextTestResult): - - def printErrors(self): - # Overridden to avoid unnecessary empty line - if self.errors or self.failures: - if self.dots or self.showAll: - self.stream.writeln() - self.printErrorList('ERROR', self.errors) - self.printErrorList('FAIL', self.failures) - - -class TerseTestRunner(TextTestRunner): - """A test runner class that displays results in textual form.""" - - def _makeResult(self): - return TerseTestResult(self.stream, self.descriptions, self.verbosity) - - def run(self, test): - "Run the given test case or test suite." - # Overridden to remove unnecessary empty lines and separators - result = self._makeResult() - test(result) - result.printErrors() - if not result.wasSuccessful(): - self.stream.write("FAILED (") - failed, errored = list(map(len, (result.failures, result.errors))) - if failed: - self.stream.write("failures=%d" % failed) - if errored: - if failed: self.stream.write(", ") - self.stream.write("errors=%d" % errored) - self.stream.writeln(")") - return result - - -class ReloadingTestLoader(TestLoader): - - def loadTestsFromName(self, name, module=None): - """Return a suite of all tests cases given a string specifier. - - The name may resolve either to a module, a test case class, a - test method within a test case class, or a callable object which - returns a TestCase or TestSuite instance. - - The method optionally resolves the names relative to a given module. - """ - parts = name.split('.') - unused_parts = [] - if module is None: - if not parts: - raise ValueError("incomplete test name: %s" % name) - else: - parts_copy = parts[:] - while parts_copy: - target = ".".join(parts_copy) - if target in sys.modules: - module = reload(sys.modules[target]) - parts = unused_parts - break - else: - try: - module = __import__(target) - parts = unused_parts - break - except ImportError: - unused_parts.insert(0,parts_copy[-1]) - del parts_copy[-1] - if not parts_copy: - raise - parts = parts[1:] - obj = module - for part in parts: - obj = getattr(obj, part) - - if type(obj) == types.ModuleType: - return self.loadTestsFromModule(obj) - elif (isinstance(obj, (type, types.ClassType)) and - issubclass(obj, TestCase)): - return self.loadTestsFromTestCase(obj) - elif type(obj) == types.UnboundMethodType: - return obj.im_class(obj.__name__) - elif hasattr(obj, '__call__'): - test = obj() - if not isinstance(test, TestCase) and \ - not isinstance(test, TestSuite): - raise ValueError("calling %s returned %s, " - "not a test" % (obj,test)) - return test - else: - raise ValueError("do not know how to make test from: %s" % obj) - - -try: - # Jython support - if sys.platform[:4] == 'java': - def getchar(): - # Hopefully this is enough - return sys.stdin.read(1) - else: - # On Windows, msvcrt.getch reads a single char without output. - import msvcrt - def getchar(): - return msvcrt.getch() -except ImportError: - # Unix getchr - import tty, termios - def getchar(): - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - ch = sys.stdin.read(1) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - return ch - - -class WebCase(TestCase): - HOST = "127.0.0.1" - PORT = 8000 - HTTP_CONN = HTTPConnection - PROTOCOL = "HTTP/1.1" - - scheme = "http" - url = None - - status = None - headers = None - body = None - - encoding = 'utf-8' - - time = None - - def get_conn(self, auto_open=False): - """Return a connection to our HTTP server.""" - if self.scheme == "https": - cls = HTTPSConnection - else: - cls = HTTPConnection - conn = cls(self.interface(), self.PORT) - # Automatically re-connect? - conn.auto_open = auto_open - conn.connect() - return conn - - def set_persistent(self, on=True, auto_open=False): - """Make our HTTP_CONN persistent (or not). - - If the 'on' argument is True (the default), then self.HTTP_CONN - will be set to an instance of HTTPConnection (or HTTPS - if self.scheme is "https"). This will then persist across requests. - - We only allow for a single open connection, so if you call this - and we currently have an open connection, it will be closed. - """ - try: - self.HTTP_CONN.close() - except (TypeError, AttributeError): - pass - - if on: - self.HTTP_CONN = self.get_conn(auto_open=auto_open) - else: - if self.scheme == "https": - self.HTTP_CONN = HTTPSConnection - else: - self.HTTP_CONN = HTTPConnection - - def _get_persistent(self): - return hasattr(self.HTTP_CONN, "__class__") - def _set_persistent(self, on): - self.set_persistent(on) - persistent = property(_get_persistent, _set_persistent) - - def interface(self): - """Return an IP address for a client connection. - - If the server is listening on '0.0.0.0' (INADDR_ANY) - or '::' (IN6ADDR_ANY), this will return the proper localhost.""" - return interface(self.HOST) - - def getPage(self, url, headers=None, method="GET", body=None, protocol=None): - """Open the url with debugging support. Return status, headers, body.""" - ServerError.on = False - - if isinstance(url, unicodestr): - url = url.encode('utf-8') - if isinstance(body, unicodestr): - body = body.encode('utf-8') - - self.url = url - self.time = None - start = time.time() - result = openURL(url, headers, method, body, self.HOST, self.PORT, - self.HTTP_CONN, protocol or self.PROTOCOL) - self.time = time.time() - start - self.status, self.headers, self.body = result - - # Build a list of request cookies from the previous response cookies. - self.cookies = [('Cookie', v) for k, v in self.headers - if k.lower() == 'set-cookie'] - - if ServerError.on: - raise ServerError() - return result - - interactive = True - console_height = 30 - - def _handlewebError(self, msg): - print("") - print(" ERROR: %s" % msg) - - if not self.interactive: - raise self.failureException(msg) - - p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> " - sys.stdout.write(p) - sys.stdout.flush() - while True: - i = getchar().upper() - if i not in "BHSUIRX": - continue - print(i.upper()) # Also prints new line - if i == "B": - for x, line in enumerate(self.body.splitlines()): - if (x + 1) % self.console_height == 0: - # The \r and comma should make the next line overwrite - sys.stdout.write("<-- More -->\r") - m = getchar().lower() - # Erase our "More" prompt - sys.stdout.write(" \r") - if m == "q": - break - print(line) - elif i == "H": - pprint.pprint(self.headers) - elif i == "S": - print(self.status) - elif i == "U": - print(self.url) - elif i == "I": - # return without raising the normal exception - return - elif i == "R": - raise self.failureException(msg) - elif i == "X": - self.exit() - sys.stdout.write(p) - sys.stdout.flush() - - def exit(self): - sys.exit() - - def assertStatus(self, status, msg=None): - """Fail if self.status != status.""" - if isinstance(status, basestring): - if not self.status == status: - if msg is None: - msg = 'Status (%r) != %r' % (self.status, status) - self._handlewebError(msg) - elif isinstance(status, int): - code = int(self.status[:3]) - if code != status: - if msg is None: - msg = 'Status (%r) != %r' % (self.status, status) - self._handlewebError(msg) - else: - # status is a tuple or list. - match = False - for s in status: - if isinstance(s, basestring): - if self.status == s: - match = True - break - elif int(self.status[:3]) == s: - match = True - break - if not match: - if msg is None: - msg = 'Status (%r) not in %r' % (self.status, status) - self._handlewebError(msg) - - def assertHeader(self, key, value=None, msg=None): - """Fail if (key, [value]) not in self.headers.""" - lowkey = key.lower() - for k, v in self.headers: - if k.lower() == lowkey: - if value is None or str(value) == v: - return v - - if msg is None: - if value is None: - msg = '%r not in headers' % key - else: - msg = '%r:%r not in headers' % (key, value) - self._handlewebError(msg) - - def assertHeaderItemValue(self, key, value, msg=None): - """Fail if the header does not contain the specified value""" - actual_value = self.assertHeader(key, msg=msg) - header_values = map(str.strip, actual_value.split(',')) - if value in header_values: - return value - - if msg is None: - msg = "%r not in %r" % (value, header_values) - self._handlewebError(msg) - - def assertNoHeader(self, key, msg=None): - """Fail if key in self.headers.""" - lowkey = key.lower() - matches = [k for k, v in self.headers if k.lower() == lowkey] - if matches: - if msg is None: - msg = '%r in headers' % key - self._handlewebError(msg) - - def assertBody(self, value, msg=None): - """Fail if value != self.body.""" - if value != self.body: - if msg is None: - msg = 'expected body:\n%r\n\nactual body:\n%r' % (value, self.body) - self._handlewebError(msg) - - def assertInBody(self, value, msg=None): - """Fail if value not in self.body.""" - if value not in self.body: - if msg is None: - msg = '%r not in body: %s' % (value, self.body) - self._handlewebError(msg) - - def assertNotInBody(self, value, msg=None): - """Fail if value in self.body.""" - if value in self.body: - if msg is None: - msg = '%r found in body' % value - self._handlewebError(msg) - - def assertMatchesBody(self, pattern, msg=None, flags=0): - """Fail if value (a regex pattern) is not in self.body.""" - if re.search(pattern, self.body, flags) is None: - if msg is None: - msg = 'No match for %r in body' % pattern - self._handlewebError(msg) - - -methods_with_bodies = ("POST", "PUT") - -def cleanHeaders(headers, method, body, host, port): - """Return request headers, with required headers added (if missing).""" - if headers is None: - headers = [] - - # Add the required Host request header if not present. - # [This specifies the host:port of the server, not the client.] - found = False - for k, v in headers: - if k.lower() == 'host': - found = True - break - if not found: - if port == 80: - headers.append(("Host", host)) - else: - headers.append(("Host", "%s:%s" % (host, port))) - - if method in methods_with_bodies: - # Stick in default type and length headers if not present - found = False - for k, v in headers: - if k.lower() == 'content-type': - found = True - break - if not found: - headers.append(("Content-Type", "application/x-www-form-urlencoded")) - headers.append(("Content-Length", str(len(body or "")))) - - return headers - - -def shb(response): - """Return status, headers, body the way we like from a response.""" - h = [] - key, value = None, None - for line in response.msg.headers: - if line: - if line[0] in " \t": - value += line.strip() - else: - if key and value: - h.append((key, value)) - key, value = line.split(":", 1) - key = key.strip() - value = value.strip() - if key and value: - h.append((key, value)) - - return "%s %s" % (response.status, response.reason), h, response.read() - - -def openURL(url, headers=None, method="GET", body=None, - host="127.0.0.1", port=8000, http_conn=HTTPConnection, - protocol="HTTP/1.1"): - """Open the given HTTP resource and return status, headers, and body.""" - - headers = cleanHeaders(headers, method, body, host, port) - - # Trying 10 times is simply in case of socket errors. - # Normal case--it should run once. - for trial in range(10): - try: - # Allow http_conn to be a class or an instance - if hasattr(http_conn, "host"): - conn = http_conn - else: - conn = http_conn(interface(host), port) - - conn._http_vsn_str = protocol - conn._http_vsn = int("".join([x for x in protocol if x.isdigit()])) - - # skip_accept_encoding argument added in python version 2.4 - if sys.version_info < (2, 4): - def putheader(self, header, value): - if header == 'Accept-Encoding' and value == 'identity': - return - self.__class__.putheader(self, header, value) - import new - conn.putheader = new.instancemethod(putheader, conn, conn.__class__) - conn.putrequest(method.upper(), url, skip_host=True) - else: - conn.putrequest(method.upper(), url, skip_host=True, - skip_accept_encoding=True) - - for key, value in headers: - conn.putheader(key, value) - conn.endheaders() - - if body is not None: - conn.send(body) - - # Handle response - response = conn.getresponse() - - s, h, b = shb(response) - - if not hasattr(http_conn, "host"): - # We made our own conn instance. Close it. - conn.close() - - return s, h, b - except socket.error: - time.sleep(0.5) - raise - - -# Add any exceptions which your web framework handles -# normally (that you don't want server_error to trap). -ignored_exceptions = [] - -# You'll want set this to True when you can't guarantee -# that each response will immediately follow each request; -# for example, when handling requests via multiple threads. -ignore_all = False - -class ServerError(Exception): - on = False - - -def server_error(exc=None): - """Server debug hook. Return True if exception handled, False if ignored. - - You probably want to wrap this, so you can still handle an error using - your framework when it's ignored. - """ - if exc is None: - exc = sys.exc_info() - - if ignore_all or exc[0] in ignored_exceptions: - return False - else: - ServerError.on = True - print("") - print("".join(traceback.format_exception(*exc))) - return True - |