diff options
Diffstat (limited to 'websdk/werkzeug/testsuite')
47 files changed, 5214 insertions, 0 deletions
diff --git a/websdk/werkzeug/testsuite/__init__.py b/websdk/werkzeug/testsuite/__init__.py new file mode 100644 index 0000000..e1c8acd --- /dev/null +++ b/websdk/werkzeug/testsuite/__init__.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite + ~~~~~~~~~~~~~~~~~~ + + Contains all test Werkzeug tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest +from werkzeug.utils import import_string, find_modules + + +def iter_suites(package): + """Yields all testsuites.""" + for module in find_modules(package, include_packages=True): + mod = import_string(module) + if hasattr(mod, 'suite'): + yield mod.suite() + + +def find_all_tests(suite): + """Yields all the tests and their names from a given suite.""" + suites = [suite] + while suites: + s = suites.pop() + try: + suites.extend(s) + except TypeError: + yield s, '%s.%s.%s' % ( + s.__class__.__module__, + s.__class__.__name__, + s._testMethodName + ) + + +class WerkzeugTestCase(unittest.TestCase): + """Baseclass for all the tests that Werkzeug uses. Use these + methods for testing instead of the camelcased ones in the + baseclass for consistency. + """ + + def setup(self): + pass + + def teardown(self): + pass + + def setUp(self): + self.setup() + + def tearDown(self): + unittest.TestCase.tearDown(self) + self.teardown() + + def assert_equal(self, x, y): + return self.assertEqual(x, y) + + def assert_not_equal(self, x, y): + return self.assertNotEqual(x, y) + + def assert_raises(self, exc_type, callable=None, *args, **kwargs): + catcher = _ExceptionCatcher(self, exc_type) + if callable is None: + return catcher + with catcher: + callable(*args, **kwargs) + + +class _ExceptionCatcher(object): + + def __init__(self, test_case, exc_type): + self.test_case = test_case + self.exc_type = exc_type + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + exception_name = self.exc_type.__name__ + if exc_type is None: + self.test_case.fail('Expected exception of type %r' % + exception_name) + elif not issubclass(exc_type, self.exc_type): + raise exc_type, exc_value, tb + return True + + +class BetterLoader(unittest.TestLoader): + """A nicer loader that solves two problems. First of all we are setting + up tests from different sources and we're doing this programmatically + which breaks the default loading logic so this is required anyways. + Secondly this loader has a nicer interpolation for test names than the + default one so you can just do ``run-tests.py ViewTestCase`` and it + will work. + """ + + def getRootSuite(self): + return suite() + + def loadTestsFromName(self, name, module=None): + root = self.getRootSuite() + if name == 'suite': + return root + + all_tests = [] + for testcase, testname in find_all_tests(root): + if testname == name or \ + testname.endswith('.' + name) or \ + ('.' + name + '.') in testname or \ + testname.startswith(name + '.'): + all_tests.append(testcase) + + if not all_tests: + raise LookupError('could not find test case for "%s"' % name) + + if len(all_tests) == 1: + return all_tests[0] + rv = unittest.TestSuite() + for test in all_tests: + rv.addTest(test) + return rv + + +def suite(): + """A testsuite that has all the Flask tests. You can use this + function to integrate the Flask tests into your own testsuite + in case you want to test that monkeypatches to Flask do not + break it. + """ + suite = unittest.TestSuite() + for other_suite in iter_suites(__name__): + suite.addTest(other_suite) + return suite + + +def main(): + """Runs the testsuite as command line application.""" + try: + unittest.main(testLoader=BetterLoader(), defaultTest='suite') + except Exception, e: + print 'Error: %s' % e diff --git a/websdk/werkzeug/testsuite/compat.py b/websdk/werkzeug/testsuite/compat.py new file mode 100644 index 0000000..f5487c3 --- /dev/null +++ b/websdk/werkzeug/testsuite/compat.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.compat + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + Ensure that old stuff does not break on update. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +import warnings +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.wrappers import Response +from werkzeug.test import create_environ + + +class CompatTestCase(WerkzeugTestCase): + + def test_old_imports(self): + from werkzeug.utils import Headers, MultiDict, CombinedMultiDict, \ + Headers, EnvironHeaders + from werkzeug.http import Accept, MIMEAccept, CharsetAccept, \ + LanguageAccept, ETags, HeaderSet, WWWAuthenticate, \ + Authorization + + def test_exposed_werkzeug_mod(self): + import werkzeug + for key in werkzeug.__all__: + # deprecated, skip it + if key in ('templates', 'Template'): + continue + getattr(werkzeug, key) + + def test_fix_headers_in_response(self): + # ignore some warnings werkzeug emits for backwards compat + for msg in ['called into deprecated fix_headers', + 'fix_headers changed behavior']: + warnings.filterwarnings('ignore', message=msg, + category=DeprecationWarning) + + class MyResponse(Response): + def fix_headers(self, environ): + Response.fix_headers(self, environ) + self.headers['x-foo'] = "meh" + myresp = MyResponse('Foo') + resp = Response.from_app(myresp, create_environ(method='GET')) + assert resp.headers['x-foo'] == 'meh' + assert resp.data == 'Foo' + + warnings.resetwarnings() + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(CompatTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/__init__.py b/websdk/werkzeug/testsuite/contrib/__init__.py new file mode 100644 index 0000000..b36a4ef --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.contrib + ~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the contrib modules. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +from werkzeug.testsuite import iter_suites + + +def suite(): + suite = unittest.TestSuite() + for other_suite in iter_suites(__name__): + suite.addTest(other_suite) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/cache.py b/websdk/werkzeug/testsuite/contrib/cache.py new file mode 100644 index 0000000..209189a --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/cache.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.cache + ~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the cache system + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import os +import time +import unittest +import tempfile +import shutil + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.contrib import cache + +try: + import redis +except ImportError: + redis = None + + + +class SimpleCacheTestCase(WerkzeugTestCase): + + def test_get_dict(self): + c = cache.SimpleCache() + c.set('a', 'a') + c.set('b', 'b') + d = c.get_dict('a', 'b') + assert 'a' in d + assert 'a' == d['a'] + assert 'b' in d + assert 'b' == d['b'] + + def test_set_many(self): + c = cache.SimpleCache() + c.set_many({0: 0, 1: 1, 2: 4}) + assert c.get(2) == 4 + c.set_many((i, i*i) for i in xrange(3)) + assert c.get(2) == 4 + + +class FileSystemCacheTestCase(WerkzeugTestCase): + + def test_set_get(self): + tmp_dir = tempfile.mkdtemp() + try: + c = cache.FileSystemCache(cache_dir=tmp_dir) + for i in range(3): + c.set(str(i), i * i) + for i in range(3): + result = c.get(str(i)) + assert result == i * i + finally: + shutil.rmtree(tmp_dir) + + def test_filesystemcache_prune(self): + THRESHOLD = 13 + tmp_dir = tempfile.mkdtemp() + c = cache.FileSystemCache(cache_dir=tmp_dir, threshold=THRESHOLD) + for i in range(2 * THRESHOLD): + c.set(str(i), i) + cache_files = os.listdir(tmp_dir) + shutil.rmtree(tmp_dir) + assert len(cache_files) <= THRESHOLD + + + def test_filesystemcache_clear(self): + tmp_dir = tempfile.mkdtemp() + c = cache.FileSystemCache(cache_dir=tmp_dir) + c.set('foo', 'bar') + cache_files = os.listdir(tmp_dir) + assert len(cache_files) == 1 + c.clear() + cache_files = os.listdir(tmp_dir) + assert len(cache_files) == 0 + shutil.rmtree(tmp_dir) + + +class RedisCacheTestCase(WerkzeugTestCase): + + def make_cache(self): + return cache.RedisCache(key_prefix='werkzeug-test-case:') + + def teardown(self): + self.make_cache().clear() + + def test_get_set(self): + c = self.make_cache() + c.set('foo', 'bar') + assert c.get('foo') == 'bar' + + def test_get_many(self): + c = self.make_cache() + c.set('foo', 'bar') + c.set('spam', 'eggs') + assert c.get_many('foo', 'spam') == ['bar', 'eggs'] + + def test_set_many(self): + c = self.make_cache() + c.set_many({'foo': 'bar', 'spam': 'eggs'}) + assert c.get('foo') == 'bar' + assert c.get('spam') == 'eggs' + + def test_expire(self): + c = self.make_cache() + c.set('foo', 'bar', 1) + time.sleep(2) + assert c.get('foo') is None + + def test_add(self): + c = self.make_cache() + # sanity check that add() works like set() + c.add('foo', 'bar') + assert c.get('foo') == 'bar' + c.add('foo', 'qux') + assert c.get('foo') == 'bar' + + def test_delete(self): + c = self.make_cache() + c.add('foo', 'bar') + assert c.get('foo') == 'bar' + c.delete('foo') + assert c.get('foo') is None + + def test_delete_many(self): + c = self.make_cache() + c.add('foo', 'bar') + c.add('spam', 'eggs') + c.delete_many('foo', 'spam') + assert c.get('foo') is None + assert c.get('spam') is None + + def test_inc_dec(self): + c = self.make_cache() + c.set('foo', 1) + assert c.inc('foo') == 2 + assert c.dec('foo') == 1 + c.delete('foo') + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SimpleCacheTestCase)) + suite.addTest(unittest.makeSuite(FileSystemCacheTestCase)) + if redis is not None: + suite.addTest(unittest.makeSuite(RedisCacheTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/fixers.py b/websdk/werkzeug/testsuite/contrib/fixers.py new file mode 100644 index 0000000..0422f8b --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/fixers.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.fixers + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + Server / Browser fixers. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.datastructures import ResponseCacheControl +from werkzeug.http import parse_cache_control_header + +from werkzeug.test import create_environ, Client +from werkzeug.wrappers import Request, Response +from werkzeug.contrib import fixers +from werkzeug.utils import redirect + + +@Request.application +def path_check_app(request): + return Response('PATH_INFO: %s\nSCRIPT_NAME: %s' % ( + request.environ.get('PATH_INFO', ''), + request.environ.get('SCRIPT_NAME', '') + )) + + +class ServerFixerTestCase(WerkzeugTestCase): + + def test_lighttpd_cgi_root_fix(self): + app = fixers.LighttpdCGIRootFix(path_check_app) + response = Response.from_app(app, dict(create_environ(), + SCRIPT_NAME='/foo', + PATH_INFO='/bar', + SERVER_SOFTWARE='lighttpd/1.4.27' + )) + assert response.data == 'PATH_INFO: /foo/bar\nSCRIPT_NAME: ' + + def test_path_info_from_request_uri_fix(self): + app = fixers.PathInfoFromRequestUriFix(path_check_app) + for key in 'REQUEST_URI', 'REQUEST_URL', 'UNENCODED_URL': + env = dict(create_environ(), SCRIPT_NAME='/test', PATH_INFO='/?????') + env[key] = '/test/foo%25bar?drop=this' + response = Response.from_app(app, env) + assert response.data == 'PATH_INFO: /foo%bar\nSCRIPT_NAME: /test' + + def test_proxy_fix(self): + """Test the ProxyFix fixer""" + @fixers.ProxyFix + @Request.application + def app(request): + return Response('%s|%s' % ( + request.remote_addr, + # do not use request.host as this fixes too :) + request.environ['HTTP_HOST'] + )) + environ = dict(create_environ(), + HTTP_X_FORWARDED_PROTO="https", + HTTP_X_FORWARDED_HOST='example.com', + HTTP_X_FORWARDED_FOR='1.2.3.4, 5.6.7.8', + REMOTE_ADDR='127.0.0.1', + HTTP_HOST='fake' + ) + + response = Response.from_app(app, environ) + + assert response.data == '1.2.3.4|example.com' + + # And we must check that if it is a redirection it is + # correctly done: + + redirect_app = redirect('/foo/bar.hml') + response = Response.from_app(redirect_app, environ) + + wsgi_headers = response.get_wsgi_headers(environ) + assert wsgi_headers['Location'] == 'https://example.com/foo/bar.hml' + + def test_proxy_fix_weird_enum(self): + @fixers.ProxyFix + @Request.application + def app(request): + return Response(request.remote_addr) + environ = dict(create_environ(), + HTTP_X_FORWARDED_FOR=',', + REMOTE_ADDR='127.0.0.1', + ) + + response = Response.from_app(app, environ) + self.assert_equal(response.data, '127.0.0.1') + + def test_header_rewriter_fix(self): + """Test the HeaderRewriterFix fixer""" + @Request.application + def application(request): + return Response("", headers=[ + ('X-Foo', 'bar') + ]) + application = fixers.HeaderRewriterFix(application, ('X-Foo',), (('X-Bar', '42'),)) + response = Response.from_app(application, create_environ()) + assert response.headers['Content-Type'] == 'text/plain; charset=utf-8' + assert 'X-Foo' not in response.headers + assert response.headers['X-Bar'] == '42' + + +class BrowserFixerTestCase(WerkzeugTestCase): + + def test_ie_fixes(self): + @fixers.InternetExplorerFix + @Request.application + def application(request): + response = Response('binary data here', mimetype='application/vnd.ms-excel') + response.headers['Vary'] = 'Cookie' + response.headers['Content-Disposition'] = 'attachment; filename=foo.xls' + return response + + c = Client(application, Response) + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + + # IE gets no vary + assert response.data == 'binary data here' + assert 'vary' not in response.headers + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + assert response.headers['content-type'] == 'application/vnd.ms-excel' + + # other browsers do + c = Client(application, Response) + response = c.get('/') + assert response.data == 'binary data here' + assert 'vary' in response.headers + + cc = ResponseCacheControl() + cc.no_cache = True + + @fixers.InternetExplorerFix + @Request.application + def application(request): + response = Response('binary data here', mimetype='application/vnd.ms-excel') + response.headers['Pragma'] = ', '.join(pragma) + response.headers['Cache-Control'] = cc.to_header() + response.headers['Content-Disposition'] = 'attachment; filename=foo.xls' + return response + + + # IE has no pragma or cache control + pragma = ('no-cache',) + c = Client(application, Response) + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + assert response.data == 'binary data here' + assert 'pragma' not in response.headers + assert 'cache-control' not in response.headers + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + # IE has simplified pragma + pragma = ('no-cache', 'x-foo') + cc.proxy_revalidate = True + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + assert response.data == 'binary data here' + assert response.headers['pragma'] == 'x-foo' + assert response.headers['cache-control'] == 'proxy-revalidate' + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + # regular browsers get everything + response = c.get('/') + assert response.data == 'binary data here' + assert response.headers['pragma'] == 'no-cache, x-foo' + cc = parse_cache_control_header(response.headers['cache-control'], + cls=ResponseCacheControl) + assert cc.no_cache + assert cc.proxy_revalidate + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ServerFixerTestCase)) + suite.addTest(unittest.makeSuite(BrowserFixerTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/iterio.py b/websdk/werkzeug/testsuite/contrib/iterio.py new file mode 100644 index 0000000..cddbd6e --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/iterio.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.iterio + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the iterio object. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.contrib.iterio import IterIO, greenlet + + +class IterOTestSuite(WerkzeugTestCase): + + def test_basic(self): + io = IterIO(["Hello", "World", "1", "2", "3"]) + assert io.tell() == 0 + assert io.read(2) == "He" + assert io.tell() == 2 + assert io.read(3) == "llo" + assert io.tell() == 5 + io.seek(0) + assert io.read(5) == "Hello" + assert io.tell() == 5 + assert io._buf == "Hello" + assert io.read() == "World123" + assert io.tell() == 13 + io.close() + assert io.closed + + io = IterIO(["Hello\n", "World!"]) + assert io.readline() == 'Hello\n' + assert io._buf == 'Hello\n' + assert io.read() == 'World!' + assert io._buf == 'Hello\nWorld!' + assert io.tell() == 12 + io.seek(0) + assert io.readlines() == ['Hello\n', 'World!'] + + io = IterIO(["foo\n", "bar"]) + io.seek(-4, 2) + assert io.read(4) == '\nbar' + + self.assert_raises(IOError, io.seek, 2, 100) + io.close() + self.assert_raises(ValueError, io.read) + + +class IterITestSuite(WerkzeugTestCase): + + def test_basic(self): + def producer(out): + out.write('1\n') + out.write('2\n') + out.flush() + out.write('3\n') + iterable = IterIO(producer) + self.assert_equal(iterable.next(), '1\n2\n') + self.assert_equal(iterable.next(), '3\n') + self.assert_raises(StopIteration, iterable.next) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(IterOTestSuite)) + if greenlet is not None: + suite.addTest(unittest.makeSuite(IterITestSuite)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/securecookie.py b/websdk/werkzeug/testsuite/contrib/securecookie.py new file mode 100644 index 0000000..fb10fa1 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/securecookie.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.securecookie + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the secure cookie. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.utils import parse_cookie +from werkzeug.wrappers import Request, Response +from werkzeug.contrib.securecookie import SecureCookie + + +class SecureCookieTestCase(WerkzeugTestCase): + + def test_basic_support(self): + c = SecureCookie(secret_key='foo') + assert c.new + print c.modified, c.should_save + assert not c.modified + assert not c.should_save + c['x'] = 42 + assert c.modified + assert c.should_save + s = c.serialize() + + c2 = SecureCookie.unserialize(s, 'foo') + assert c is not c2 + assert not c2.new + assert not c2.modified + assert not c2.should_save + assert c2 == c + + c3 = SecureCookie.unserialize(s, 'wrong foo') + assert not c3.modified + assert not c3.new + assert c3 == {} + + def test_wrapper_support(self): + req = Request.from_values() + resp = Response() + c = SecureCookie.load_cookie(req, secret_key='foo') + assert c.new + c['foo'] = 42 + assert c.secret_key == 'foo' + c.save_cookie(resp) + + req = Request.from_values(headers={ + 'Cookie': 'session="%s"' % parse_cookie(resp.headers['set-cookie'])['session'] + }) + c2 = SecureCookie.load_cookie(req, secret_key='foo') + assert not c2.new + assert c2 == c + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SecureCookieTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/sessions.py b/websdk/werkzeug/testsuite/contrib/sessions.py new file mode 100644 index 0000000..18015a6 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/sessions.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.sessions + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Added tests for the sessions. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +import shutil + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.contrib.sessions import FilesystemSessionStore + +from tempfile import mkdtemp, gettempdir + + +class SessionTestCase(WerkzeugTestCase): + + def setup(self): + self.session_folder = mkdtemp() + + def teardown(self): + shutil.rmtree(self.session_folder) + + def test_default_tempdir(self): + store = FilesystemSessionStore() + assert store.path == gettempdir() + + def test_basic_fs_sessions(self): + store = FilesystemSessionStore(self.session_folder) + x = store.new() + assert x.new + assert not x.modified + x['foo'] = [1, 2, 3] + assert x.modified + store.save(x) + + x2 = store.get(x.sid) + assert not x2.new + assert not x2.modified + assert x2 is not x + assert x2 == x + x2['test'] = 3 + assert x2.modified + assert not x2.new + store.save(x2) + + x = store.get(x.sid) + store.delete(x) + x2 = store.get(x.sid) + # the session is not new when it was used previously. + assert not x2.new + + def test_renewing_fs_session(self): + store = FilesystemSessionStore(self.session_folder, renew_missing=True) + x = store.new() + store.save(x) + store.delete(x) + x2 = store.get(x.sid) + assert x2.new + + def test_fs_session_lising(self): + store = FilesystemSessionStore(self.session_folder, renew_missing=True) + sessions = set() + for x in xrange(10): + sess = store.new() + store.save(sess) + sessions.add(sess.sid) + + listed_sessions = set(store.list()) + assert sessions == listed_sessions + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SessionTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/wrappers.py b/websdk/werkzeug/testsuite/contrib/wrappers.py new file mode 100644 index 0000000..e22d9a4 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/wrappers.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.contrib.wrappers + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Added tests for the sessions. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.contrib import wrappers +from werkzeug import routing +from werkzeug.wrappers import Request, Response + + +class WrappersTestCase(WerkzeugTestCase): + + def test_reverse_slash_behavior(self): + class MyRequest(wrappers.ReverseSlashBehaviorRequestMixin, Request): + pass + req = MyRequest.from_values('/foo/bar', 'http://example.com/test') + assert req.url == 'http://example.com/test/foo/bar' + assert req.path == 'foo/bar' + assert req.script_root == '/test/' + + # make sure the routing system works with the slashes in + # reverse order as well. + map = routing.Map([routing.Rule('/foo/bar', endpoint='foo')]) + adapter = map.bind_to_environ(req.environ) + assert adapter.match() == ('foo', {}) + adapter = map.bind(req.host, req.script_root) + assert adapter.match(req.path) == ('foo', {}) + + def test_dynamic_charset_request_mixin(self): + class MyRequest(wrappers.DynamicCharsetRequestMixin, Request): + pass + env = {'CONTENT_TYPE': 'text/html'} + req = MyRequest(env) + assert req.charset == 'latin1' + + env = {'CONTENT_TYPE': 'text/html; charset=utf-8'} + req = MyRequest(env) + assert req.charset == 'utf-8' + + env = {'CONTENT_TYPE': 'application/octet-stream'} + req = MyRequest(env) + assert req.charset == 'latin1' + assert req.url_charset == 'latin1' + + MyRequest.url_charset = 'utf-8' + env = {'CONTENT_TYPE': 'application/octet-stream'} + req = MyRequest(env) + assert req.charset == 'latin1' + assert req.url_charset == 'utf-8' + + def return_ascii(x): + return "ascii" + env = {'CONTENT_TYPE': 'text/plain; charset=x-weird-charset'} + req = MyRequest(env) + req.unknown_charset = return_ascii + assert req.charset == 'ascii' + assert req.url_charset == 'utf-8' + + def test_dynamic_charset_response_mixin(self): + class MyResponse(wrappers.DynamicCharsetResponseMixin, Response): + default_charset = 'utf-7' + resp = MyResponse(mimetype='text/html') + assert resp.charset == 'utf-7' + resp.charset = 'utf-8' + assert resp.charset == 'utf-8' + assert resp.mimetype == 'text/html' + assert resp.mimetype_params == {'charset': 'utf-8'} + resp.mimetype_params['charset'] = 'iso-8859-15' + assert resp.charset == 'iso-8859-15' + resp.data = u'Hällo Wörld' + assert ''.join(resp.iter_encoded()) == \ + u'Hällo Wörld'.encode('iso-8859-15') + del resp.headers['content-type'] + try: + resp.charset = 'utf-8' + except TypeError, e: + pass + else: + assert False, 'expected type error on charset setting without ct' + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(WrappersTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/datastructures.py b/websdk/werkzeug/testsuite/datastructures.py new file mode 100644 index 0000000..3efedb7 --- /dev/null +++ b/websdk/werkzeug/testsuite/datastructures.py @@ -0,0 +1,618 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.datastructures + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the functionality of the provided Werkzeug + datastructures. + + TODO: + + - FileMultiDict + - convert to proper asserts + - Immutable types undertested + - Split up dict tests + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest +import pickle +from copy import copy +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import datastructures +from werkzeug.exceptions import BadRequestKeyError + + +class MutableMultiDictBaseTestCase(WerkzeugTestCase): + storage_class = None + + def test_pickle(self): + cls = self.storage_class + + for protocol in xrange(pickle.HIGHEST_PROTOCOL + 1): + d = cls() + d.setlist('foo', [1, 2, 3, 4]) + d.setlist('bar', 'foo bar baz'.split()) + s = pickle.dumps(d, protocol) + ud = pickle.loads(s) + self.assert_equal(type(ud), type(d)) + self.assert_equal(ud, d) + self.assert_equal(pickle.loads( + s.replace('werkzeug.datastructures', 'werkzeug')), d) + ud['newkey'] = 'bla' + self.assert_not_equal(ud, d) + + def test_basic_interface(self): + md = self.storage_class() + assert isinstance(md, dict) + + mapping = [('a', 1), ('b', 2), ('a', 2), ('d', 3), + ('a', 1), ('a', 3), ('d', 4), ('c', 3)] + md = self.storage_class(mapping) + + # simple getitem gives the first value + assert md['a'] == 1 + assert md['c'] == 3 + with self.assert_raises(KeyError): + md['e'] + assert md.get('a') == 1 + + # list getitem + assert md.getlist('a') == [1, 2, 1, 3] + assert md.getlist('d') == [3, 4] + # do not raise if key not found + assert md.getlist('x') == [] + + # simple setitem overwrites all values + md['a'] = 42 + assert md.getlist('a') == [42] + + # list setitem + md.setlist('a', [1, 2, 3]) + assert md['a'] == 1 + assert md.getlist('a') == [1, 2, 3] + + # verify that it does not change original lists + l1 = [1, 2, 3] + md.setlist('a', l1) + del l1[:] + assert md['a'] == 1 + + # setdefault, setlistdefault + assert md.setdefault('u', 23) == 23 + assert md.getlist('u') == [23] + del md['u'] + + md.setlist('u', [-1, -2]) + + # delitem + del md['u'] + with self.assert_raises(KeyError): + md['u'] + del md['d'] + assert md.getlist('d') == [] + + # keys, values, items, lists + assert list(sorted(md.keys())) == ['a', 'b', 'c'] + assert list(sorted(md.iterkeys())) == ['a', 'b', 'c'] + + assert list(sorted(md.values())) == [1, 2, 3] + assert list(sorted(md.itervalues())) == [1, 2, 3] + + assert list(sorted(md.items())) == [('a', 1), ('b', 2), ('c', 3)] + assert list(sorted(md.items(multi=True))) == \ + [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('c', 3)] + assert list(sorted(md.iteritems())) == [('a', 1), ('b', 2), ('c', 3)] + assert list(sorted(md.iteritems(multi=True))) == \ + [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('c', 3)] + + assert list(sorted(md.lists())) == [('a', [1, 2, 3]), ('b', [2]), ('c', [3])] + assert list(sorted(md.iterlists())) == [('a', [1, 2, 3]), ('b', [2]), ('c', [3])] + + # copy method + c = md.copy() + assert c['a'] == 1 + assert c.getlist('a') == [1, 2, 3] + + # copy method 2 + c = copy(md) + assert c['a'] == 1 + assert c.getlist('a') == [1, 2, 3] + + # update with a multidict + od = self.storage_class([('a', 4), ('a', 5), ('y', 0)]) + md.update(od) + assert md.getlist('a') == [1, 2, 3, 4, 5] + assert md.getlist('y') == [0] + + # update with a regular dict + md = c + od = {'a': 4, 'y': 0} + md.update(od) + assert md.getlist('a') == [1, 2, 3, 4] + assert md.getlist('y') == [0] + + # pop, poplist, popitem, popitemlist + assert md.pop('y') == 0 + assert 'y' not in md + assert md.poplist('a') == [1, 2, 3, 4] + assert 'a' not in md + assert md.poplist('missing') == [] + + # remaining: b=2, c=3 + popped = md.popitem() + assert popped in [('b', 2), ('c', 3)] + popped = md.popitemlist() + assert popped in [('b', [2]), ('c', [3])] + + # type conversion + md = self.storage_class({'a': '4', 'b': ['2', '3']}) + assert md.get('a', type=int) == 4 + assert md.getlist('b', type=int) == [2, 3] + + # repr + md = self.storage_class([('a', 1), ('a', 2), ('b', 3)]) + assert "('a', 1)" in repr(md) + assert "('a', 2)" in repr(md) + assert "('b', 3)" in repr(md) + + # add and getlist + md.add('c', '42') + md.add('c', '23') + assert md.getlist('c') == ['42', '23'] + md.add('c', 'blah') + assert md.getlist('c', type=int) == [42, 23] + + # setdefault + md = self.storage_class() + md.setdefault('x', []).append(42) + md.setdefault('x', []).append(23) + assert md['x'] == [42, 23] + + # to dict + md = self.storage_class() + md['foo'] = 42 + md.add('bar', 1) + md.add('bar', 2) + assert md.to_dict() == {'foo': 42, 'bar': 1} + assert md.to_dict(flat=False) == {'foo': [42], 'bar': [1, 2]} + + # popitem from empty dict + with self.assert_raises(KeyError): + self.storage_class().popitem() + + with self.assert_raises(KeyError): + self.storage_class().popitemlist() + + # key errors are of a special type + with self.assert_raises(BadRequestKeyError): + self.storage_class()[42] + + # setlist works + md = self.storage_class() + md['foo'] = 42 + md.setlist('foo', [1, 2]) + assert md.getlist('foo') == [1, 2] + + +class ImmutableDictBaseTestCase(WerkzeugTestCase): + storage_class = None + + def test_follows_dict_interface(self): + cls = self.storage_class + + data = {'foo': 1, 'bar': 2, 'baz': 3} + d = cls(data) + + self.assert_equal(d['foo'], 1) + self.assert_equal(d['bar'], 2) + self.assert_equal(d['baz'], 3) + self.assert_equal(sorted(d.keys()), ['bar', 'baz', 'foo']) + self.assert_('foo' in d) + self.assert_('foox' not in d) + self.assert_equal(len(d), 3) + + def test_copies_are_mutable(self): + cls = self.storage_class + immutable = cls({'a': 1}) + with self.assert_raises(TypeError): + immutable.pop('a') + + mutable = immutable.copy() + mutable.pop('a') + self.assert_('a' in immutable) + self.assert_(mutable is not immutable) + self.assert_(copy(immutable) is immutable) + + def test_dict_is_hashable(self): + cls = self.storage_class + immutable = cls({'a': 1, 'b': 2}) + immutable2 = cls({'a': 2, 'b': 2}) + x = set([immutable]) + self.assert_(immutable in x) + self.assert_(immutable2 not in x) + x.discard(immutable) + self.assert_(immutable not in x) + self.assert_(immutable2 not in x) + x.add(immutable2) + self.assert_(immutable not in x) + self.assert_(immutable2 in x) + x.add(immutable) + self.assert_(immutable in x) + self.assert_(immutable2 in x) + + +class ImmutableTypeConversionDictTestCase(ImmutableDictBaseTestCase): + storage_class = datastructures.ImmutableTypeConversionDict + + +class ImmutableMultiDictTestCase(ImmutableDictBaseTestCase): + storage_class = datastructures.ImmutableMultiDict + + def test_multidict_is_hashable(self): + cls = self.storage_class + immutable = cls({'a': [1, 2], 'b': 2}) + immutable2 = cls({'a': [1], 'b': 2}) + x = set([immutable]) + self.assert_(immutable in x) + self.assert_(immutable2 not in x) + x.discard(immutable) + self.assert_(immutable not in x) + self.assert_(immutable2 not in x) + x.add(immutable2) + self.assert_(immutable not in x) + self.assert_(immutable2 in x) + x.add(immutable) + self.assert_(immutable in x) + self.assert_(immutable2 in x) + + +class ImmutableDictTestCase(ImmutableDictBaseTestCase): + storage_class = datastructures.ImmutableDict + + +class ImmutableOrderedMultiDictTestCase(ImmutableDictBaseTestCase): + storage_class = datastructures.ImmutableOrderedMultiDict + + def test_ordered_multidict_is_hashable(self): + a = self.storage_class([('a', 1), ('b', 1), ('a', 2)]) + b = self.storage_class([('a', 1), ('a', 2), ('b', 1)]) + self.assert_not_equal(hash(a), hash(b)) + + +class MultiDictTestCase(MutableMultiDictBaseTestCase): + storage_class = datastructures.MultiDict + + def test_multidict_pop(self): + make_d = lambda: self.storage_class({'foo': [1, 2, 3, 4]}) + d = make_d() + assert d.pop('foo') == 1 + assert not d + d = make_d() + assert d.pop('foo', 32) == 1 + assert not d + d = make_d() + assert d.pop('foos', 32) == 32 + assert d + + with self.assert_raises(KeyError): + d.pop('foos') + + def test_setlistdefault(self): + md = self.storage_class() + assert md.setlistdefault('u', [-1, -2]) == [-1, -2] + assert md.getlist('u') == [-1, -2] + assert md['u'] == -1 + + def test_iter_interfaces(self): + mapping = [('a', 1), ('b', 2), ('a', 2), ('d', 3), + ('a', 1), ('a', 3), ('d', 4), ('c', 3)] + md = self.storage_class(mapping) + assert list(zip(md.keys(), md.listvalues())) == list(md.lists()) + assert list(zip(md, md.iterlistvalues())) == list(md.iterlists()) + assert list(zip(md.iterkeys(), md.iterlistvalues())) == list(md.iterlists()) + + +class OrderedMultiDictTestCase(MutableMultiDictBaseTestCase): + storage_class = datastructures.OrderedMultiDict + + def test_ordered_interface(self): + cls = self.storage_class + + d = cls() + assert not d + d.add('foo', 'bar') + assert len(d) == 1 + d.add('foo', 'baz') + assert len(d) == 1 + assert d.items() == [('foo', 'bar')] + assert list(d) == ['foo'] + assert d.items(multi=True) == [('foo', 'bar'), + ('foo', 'baz')] + del d['foo'] + assert not d + assert len(d) == 0 + assert list(d) == [] + + d.update([('foo', 1), ('foo', 2), ('bar', 42)]) + d.add('foo', 3) + assert d.getlist('foo') == [1, 2, 3] + assert d.getlist('bar') == [42] + assert d.items() == [('foo', 1), ('bar', 42)] + assert d.keys() == list(d) == list(d.iterkeys()) == ['foo', 'bar'] + assert d.items(multi=True) == [('foo', 1), ('foo', 2), + ('bar', 42), ('foo', 3)] + assert len(d) == 2 + + assert d.pop('foo') == 1 + assert d.pop('blafasel', None) is None + assert d.pop('blafasel', 42) == 42 + assert len(d) == 1 + assert d.poplist('bar') == [42] + assert not d + + d.get('missingkey') is None + + d.add('foo', 42) + d.add('foo', 23) + d.add('bar', 2) + d.add('foo', 42) + assert d == datastructures.MultiDict(d) + id = self.storage_class(d) + assert d == id + d.add('foo', 2) + assert d != id + + d.update({'blah': [1, 2, 3]}) + assert d['blah'] == 1 + assert d.getlist('blah') == [1, 2, 3] + + # setlist works + d = self.storage_class() + d['foo'] = 42 + d.setlist('foo', [1, 2]) + assert d.getlist('foo') == [1, 2] + + with self.assert_raises(BadRequestKeyError): + d.pop('missing') + with self.assert_raises(BadRequestKeyError): + d['missing'] + + # popping + d = self.storage_class() + d.add('foo', 23) + d.add('foo', 42) + d.add('foo', 1) + assert d.popitem() == ('foo', 23) + with self.assert_raises(BadRequestKeyError): + d.popitem() + assert not d + + d.add('foo', 23) + d.add('foo', 42) + d.add('foo', 1) + assert d.popitemlist() == ('foo', [23, 42, 1]) + + with self.assert_raises(BadRequestKeyError): + d.popitemlist() + + +class CombinedMultiDictTestCase(WerkzeugTestCase): + storage_class = datastructures.CombinedMultiDict + + def test_basic_interface(self): + d1 = datastructures.MultiDict([('foo', '1')]) + d2 = datastructures.MultiDict([('bar', '2'), ('bar', '3')]) + d = self.storage_class([d1, d2]) + + # lookup + assert d['foo'] == '1' + assert d['bar'] == '2' + assert d.getlist('bar') == ['2', '3'] + + assert sorted(d.items()) == [('bar', '2'), ('foo', '1')], d.items() + assert sorted(d.items(multi=True)) == [('bar', '2'), ('bar', '3'), ('foo', '1')] + assert 'missingkey' not in d + assert 'foo' in d + + # type lookup + assert d.get('foo', type=int) == 1 + assert d.getlist('bar', type=int) == [2, 3] + + # get key errors for missing stuff + with self.assert_raises(KeyError): + d['missing'] + + # make sure that they are immutable + with self.assert_raises(TypeError): + d['foo'] = 'blub' + + # copies are immutable + d = d.copy() + with self.assert_raises(TypeError): + d['foo'] = 'blub' + + # make sure lists merges + md1 = datastructures.MultiDict((("foo", "bar"),)) + md2 = datastructures.MultiDict((("foo", "blafasel"),)) + x = self.storage_class((md1, md2)) + assert x.lists() == [('foo', ['bar', 'blafasel'])] + + +class HeadersTestCase(WerkzeugTestCase): + storage_class = datastructures.Headers + + def test_basic_interface(self): + headers = self.storage_class() + headers.add('Content-Type', 'text/plain') + headers.add('X-Foo', 'bar') + assert 'x-Foo' in headers + assert 'Content-type' in headers + + headers['Content-Type'] = 'foo/bar' + assert headers['Content-Type'] == 'foo/bar' + assert len(headers.getlist('Content-Type')) == 1 + + # list conversion + assert headers.to_list() == [ + ('Content-Type', 'foo/bar'), + ('X-Foo', 'bar') + ] + assert str(headers) == ( + "Content-Type: foo/bar\r\n" + "X-Foo: bar\r\n" + "\r\n") + assert str(self.storage_class()) == "\r\n" + + # extended add + headers.add('Content-Disposition', 'attachment', filename='foo') + assert headers['Content-Disposition'] == 'attachment; filename=foo' + + headers.add('x', 'y', z='"') + assert headers['x'] == r'y; z="\""' + + def test_defaults_and_conversion(self): + # defaults + headers = self.storage_class([ + ('Content-Type', 'text/plain'), + ('X-Foo', 'bar'), + ('X-Bar', '1'), + ('X-Bar', '2') + ]) + assert headers.getlist('x-bar') == ['1', '2'] + assert headers.get('x-Bar') == '1' + assert headers.get('Content-Type') == 'text/plain' + + assert headers.setdefault('X-Foo', 'nope') == 'bar' + assert headers.setdefault('X-Bar', 'nope') == '1' + assert headers.setdefault('X-Baz', 'quux') == 'quux' + assert headers.setdefault('X-Baz', 'nope') == 'quux' + headers.pop('X-Baz') + + # type conversion + assert headers.get('x-bar', type=int) == 1 + assert headers.getlist('x-bar', type=int) == [1, 2] + + # list like operations + assert headers[0] == ('Content-Type', 'text/plain') + assert headers[:1] == self.storage_class([('Content-Type', 'text/plain')]) + del headers[:2] + del headers[-1] + assert headers == self.storage_class([('X-Bar', '1')]) + + def test_copying(self): + a = self.storage_class([('foo', 'bar')]) + b = a.copy() + a.add('foo', 'baz') + assert a.getlist('foo') == ['bar', 'baz'] + assert b.getlist('foo') == ['bar'] + + def test_popping(self): + headers = self.storage_class([('a', 1)]) + assert headers.pop('a') == 1 + assert headers.pop('b', 2) == 2 + + with self.assert_raises(KeyError): + headers.pop('c') + + def test_set_arguments(self): + a = self.storage_class() + a.set('Content-Disposition', 'useless') + a.set('Content-Disposition', 'attachment', filename='foo') + assert a['Content-Disposition'] == 'attachment; filename=foo' + + def test_reject_newlines(self): + h = self.storage_class() + + for variation in 'foo\nbar', 'foo\r\nbar', 'foo\rbar': + with self.assert_raises(ValueError): + h['foo'] = variation + with self.assert_raises(ValueError): + h.add('foo', variation) + with self.assert_raises(ValueError): + h.add('foo', 'test', option=variation) + with self.assert_raises(ValueError): + h.set('foo', variation) + with self.assert_raises(ValueError): + h.set('foo', 'test', option=variation) + + +class EnvironHeadersTestCase(WerkzeugTestCase): + storage_class = datastructures.EnvironHeaders + + def test_basic_interface(self): + # this happens in multiple WSGI servers because they + # use a vary naive way to convert the headers; + broken_env = { + 'HTTP_CONTENT_TYPE': 'text/html', + 'CONTENT_TYPE': 'text/html', + 'HTTP_CONTENT_LENGTH': '0', + 'CONTENT_LENGTH': '0', + 'HTTP_ACCEPT': '*', + 'wsgi.version': (1, 0) + } + headers = self.storage_class(broken_env) + assert headers + assert len(headers) == 3 + assert sorted(headers) == [ + ('Accept', '*'), + ('Content-Length', '0'), + ('Content-Type', 'text/html') + ] + assert not self.storage_class({'wsgi.version': (1, 0)}) + assert len(self.storage_class({'wsgi.version': (1, 0)})) == 0 + + +class HeaderSetTestCase(WerkzeugTestCase): + storage_class = datastructures.HeaderSet + + def test_basic_interface(self): + hs = self.storage_class() + hs.add('foo') + hs.add('bar') + assert 'Bar' in hs + assert hs.find('foo') == 0 + assert hs.find('BAR') == 1 + assert hs.find('baz') < 0 + hs.discard('missing') + hs.discard('foo') + assert hs.find('foo') < 0 + assert hs.find('bar') == 0 + + with self.assert_raises(IndexError): + hs.index('missing') + + assert hs.index('bar') == 0 + assert hs + hs.clear() + assert not hs + + +class ImmutableListTestCase(WerkzeugTestCase): + storage_class = datastructures.ImmutableList + + def test_list_hashable(self): + t = (1, 2, 3, 4) + l = self.storage_class(t) + self.assert_equal(hash(t), hash(l)) + self.assert_not_equal(t, l) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(MultiDictTestCase)) + suite.addTest(unittest.makeSuite(OrderedMultiDictTestCase)) + suite.addTest(unittest.makeSuite(CombinedMultiDictTestCase)) + suite.addTest(unittest.makeSuite(ImmutableTypeConversionDictTestCase)) + suite.addTest(unittest.makeSuite(ImmutableMultiDictTestCase)) + suite.addTest(unittest.makeSuite(ImmutableDictTestCase)) + suite.addTest(unittest.makeSuite(ImmutableOrderedMultiDictTestCase)) + suite.addTest(unittest.makeSuite(HeadersTestCase)) + suite.addTest(unittest.makeSuite(EnvironHeadersTestCase)) + suite.addTest(unittest.makeSuite(HeaderSetTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/debug.py b/websdk/werkzeug/testsuite/debug.py new file mode 100644 index 0000000..ffe9a2e --- /dev/null +++ b/websdk/werkzeug/testsuite/debug.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.debug + ~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests some debug utilities. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +import sys +import re + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.debug.repr import debug_repr, DebugReprGenerator, \ + dump, helper +from werkzeug.debug.console import HTMLStringO + + +class DebugReprTestCase(WerkzeugTestCase): + + def test_basic_repr(self): + assert debug_repr([]) == u'[]' + assert debug_repr([1, 2]) == \ + u'[<span class="number">1</span>, <span class="number">2</span>]' + assert debug_repr([1, 'test']) == \ + u'[<span class="number">1</span>, <span class="string">\'test\'</span>]' + assert debug_repr([None]) == \ + u'[<span class="object">None</span>]' + + def test_sequence_repr(self): + assert debug_repr(list(range(20))) == ( + u'[<span class="number">0</span>, <span class="number">1</span>, ' + u'<span class="number">2</span>, <span class="number">3</span>, ' + u'<span class="number">4</span>, <span class="number">5</span>, ' + u'<span class="number">6</span>, <span class="number">7</span>, ' + u'<span class="extended"><span class="number">8</span>, ' + u'<span class="number">9</span>, <span class="number">10</span>, ' + u'<span class="number">11</span>, <span class="number">12</span>, ' + u'<span class="number">13</span>, <span class="number">14</span>, ' + u'<span class="number">15</span>, <span class="number">16</span>, ' + u'<span class="number">17</span>, <span class="number">18</span>, ' + u'<span class="number">19</span></span>]' + ) + + def test_mapping_repr(self): + assert debug_repr({}) == u'{}' + assert debug_repr({'foo': 42}) == \ + u'{<span class="pair"><span class="key"><span class="string">\'foo\''\ + u'</span></span>: <span class="value"><span class="number">42' \ + u'</span></span></span>}' + assert debug_repr(dict(zip(range(10), [None] * 10))) == \ + u'{<span class="pair"><span class="key"><span class="number">0</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">1</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">2</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">3</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="extended"><span class="pair"><span class="key"><span class="number">4</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">5</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">6</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">7</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">8</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">9</span></span>: <span class="value"><span class="object">None</span></span></span></span>}' + assert debug_repr((1, 'zwei', u'drei')) ==\ + u'(<span class="number">1</span>, <span class="string">\'' \ + u'zwei\'</span>, <span class="string">u\'drei\'</span>)' + + def test_custom_repr(self): + class Foo(object): + def __repr__(self): + return '<Foo 42>' + assert debug_repr(Foo()) == '<span class="object"><Foo 42></span>' + + def test_list_subclass_repr(self): + class MyList(list): + pass + assert debug_repr(MyList([1, 2])) == \ + u'<span class="module">werkzeug.testsuite.debug.</span>MyList([' \ + u'<span class="number">1</span>, <span class="number">2</span>])' + + def test_regex_repr(self): + assert debug_repr(re.compile(r'foo\d')) == \ + u're.compile(<span class="string regex">r\'foo\\d\'</span>)' + assert debug_repr(re.compile(ur'foo\d')) == \ + u're.compile(<span class="string regex">ur\'foo\\d\'</span>)' + + def test_set_repr(self): + assert debug_repr(frozenset('x')) == \ + u'frozenset([<span class="string">\'x\'</span>])' + assert debug_repr(set('x')) == \ + u'set([<span class="string">\'x\'</span>])' + + def test_recursive_repr(self): + a = [1] + a.append(a) + assert debug_repr(a) == u'[<span class="number">1</span>, [...]]' + + def test_broken_repr(self): + class Foo(object): + def __repr__(self): + 1/0 + + assert debug_repr(Foo()) == \ + u'<span class="brokenrepr"><broken repr (ZeroDivisionError: ' \ + u'integer division or modulo by zero)></span>' + + +class DebugHelpersTestCase(WerkzeugTestCase): + + def test_object_dumping(self): + class Foo(object): + x = 42 + y = 23 + def __init__(self): + self.z = 15 + + drg = DebugReprGenerator() + out = drg.dump_object(Foo()) + assert re.search('Details for werkzeug.testsuite.debug.Foo object at', out) + assert re.search('<th>x.*<span class="number">42</span>(?s)', out) + assert re.search('<th>y.*<span class="number">23</span>(?s)', out) + assert re.search('<th>z.*<span class="number">15</span>(?s)', out) + + out = drg.dump_object({'x': 42, 'y': 23}) + assert re.search('Contents of', out) + assert re.search('<th>x.*<span class="number">42</span>(?s)', out) + assert re.search('<th>y.*<span class="number">23</span>(?s)', out) + + out = drg.dump_object({'x': 42, 'y': 23, 23: 11}) + assert not re.search('Contents of', out) + + out = drg.dump_locals({'x': 42, 'y': 23}) + assert re.search('Local variables in frame', out) + assert re.search('<th>x.*<span class="number">42</span>(?s)', out) + assert re.search('<th>y.*<span class="number">23</span>(?s)', out) + + def test_debug_dump(self): + old = sys.stdout + sys.stdout = HTMLStringO() + try: + dump([1, 2, 3]) + x = sys.stdout.reset() + dump() + y = sys.stdout.reset() + finally: + sys.stdout = old + + assert 'Details for list object at' in x + assert '<span class="number">1</span>' in x + assert 'Local variables in frame' in y + assert '<th>x' in y + assert '<th>old' in y + + def test_debug_help(self): + old = sys.stdout + sys.stdout = HTMLStringO() + try: + helper([1, 2, 3]) + x = sys.stdout.reset() + finally: + sys.stdout = old + + assert 'Help on list object' in x + assert '__delitem__' in x + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(DebugReprTestCase)) + suite.addTest(unittest.makeSuite(DebugHelpersTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/exceptions.py b/websdk/werkzeug/testsuite/exceptions.py new file mode 100644 index 0000000..8530b9c --- /dev/null +++ b/websdk/werkzeug/testsuite/exceptions.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.exceptions + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + The tests for the exception classes. + + TODO: + + - This is undertested. HTML is never checked + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import exceptions +from werkzeug.wrappers import Response + + +class ExceptionsTestCase(WerkzeugTestCase): + + def test_proxy_exception(self): + """Proxy exceptions""" + orig_resp = Response('Hello World') + try: + exceptions.abort(orig_resp) + except exceptions.HTTPException, e: + resp = e.get_response({}) + else: + self.fail('exception not raised') + self.assert_(resp is orig_resp) + self.assert_equal(resp.data, 'Hello World') + + def test_aborter(self): + """Exception aborter""" + abort = exceptions.abort + self.assert_raises(exceptions.BadRequest, abort, 400) + self.assert_raises(exceptions.Unauthorized, abort, 401) + self.assert_raises(exceptions.Forbidden, abort, 403) + self.assert_raises(exceptions.NotFound, abort, 404) + self.assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD']) + self.assert_raises(exceptions.NotAcceptable, abort, 406) + self.assert_raises(exceptions.RequestTimeout, abort, 408) + self.assert_raises(exceptions.Gone, abort, 410) + self.assert_raises(exceptions.LengthRequired, abort, 411) + self.assert_raises(exceptions.PreconditionFailed, abort, 412) + self.assert_raises(exceptions.RequestEntityTooLarge, abort, 413) + self.assert_raises(exceptions.RequestURITooLarge, abort, 414) + self.assert_raises(exceptions.UnsupportedMediaType, abort, 415) + self.assert_raises(exceptions.InternalServerError, abort, 500) + self.assert_raises(exceptions.NotImplemented, abort, 501) + self.assert_raises(exceptions.BadGateway, abort, 502) + self.assert_raises(exceptions.ServiceUnavailable, abort, 503) + + myabort = exceptions.Aborter({1: exceptions.NotFound}) + self.assert_raises(LookupError, myabort, 404) + self.assert_raises(exceptions.NotFound, myabort, 1) + + myabort = exceptions.Aborter(extra={1: exceptions.NotFound}) + self.assert_raises(exceptions.NotFound, myabort, 404) + self.assert_raises(exceptions.NotFound, myabort, 1) + + def test_exception_repr(self): + exc = exceptions.NotFound() + self.assert_equal(unicode(exc), '404: Not Found') + self.assert_equal(repr(exc), "<NotFound '404: Not Found'>") + + exc = exceptions.NotFound('Not There') + self.assert_equal(unicode(exc), '404: Not There') + self.assert_equal(repr(exc), "<NotFound '404: Not There'>") + + def test_special_exceptions(self): + exc = exceptions.MethodNotAllowed(['GET', 'HEAD', 'POST']) + h = dict(exc.get_headers({})) + self.assert_equal(h['Allow'], 'GET, HEAD, POST') + self.assert_('The method DELETE is not allowed' in exc.get_description({ + 'REQUEST_METHOD': 'DELETE' + })) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ExceptionsTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/formparser.py b/websdk/werkzeug/testsuite/formparser.py new file mode 100644 index 0000000..7dd5b79 --- /dev/null +++ b/websdk/werkzeug/testsuite/formparser.py @@ -0,0 +1,370 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.formparser + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the form parsing facilities. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest +from StringIO import StringIO +from os.path import join, dirname + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import formparser +from werkzeug.test import create_environ, Client +from werkzeug.wrappers import Request, Response +from werkzeug.exceptions import RequestEntityTooLarge + + +@Request.application +def form_data_consumer(request): + result_object = request.args['object'] + if result_object == 'text': + return Response(repr(request.form['text'])) + f = request.files[result_object] + return Response('\n'.join(( + repr(f.filename), + repr(f.name), + repr(f.content_type), + f.stream.read() + ))) + + +def get_contents(filename): + f = file(filename, 'rb') + try: + return f.read() + finally: + f.close() + + +class FormParserTestCase(WerkzeugTestCase): + + def test_limiting(self): + """Test the limiting features""" + data = 'foo=Hello+World&bar=baz' + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='application/x-www-form-urlencoded', + method='POST') + req.max_content_length = 400 + self.assert_equal(req.form['foo'], 'Hello World') + + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='application/x-www-form-urlencoded', + method='POST') + req.max_form_memory_size = 7 + self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo']) + + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='application/x-www-form-urlencoded', + method='POST') + req.max_form_memory_size = 400 + self.assert_equal(req.form['foo'], 'Hello World') + + data = ('--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n' + 'Hello World\r\n' + '--foo\r\nContent-Disposition: form-field; name=bar\r\n\r\n' + 'bar=baz\r\n--foo--') + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + req.max_content_length = 4 + self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo']) + + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + req.max_content_length = 400 + self.assert_equal(req.form['foo'], 'Hello World') + + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + req.max_form_memory_size = 7 + self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo']) + + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + req.max_form_memory_size = 400 + self.assert_equal(req.form['foo'], 'Hello World') + + def test_parse_form_data_put_without_content(self): + """A PUT without a Content-Type header returns empty data + + Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message + containing an entity-body SHOULD include a Content-Type header field + defining the media type of that body." In the case where either + headers are omitted, parse_form_data should still work. + """ + env = create_environ('/foo', 'http://example.org/', method='PUT') + del env['CONTENT_TYPE'] + del env['CONTENT_LENGTH'] + + stream, form, files = formparser.parse_form_data(env) + self.assert_equal(stream.read(), '') + self.assert_equal(len(form), 0) + self.assert_equal(len(files), 0) + + def test_parse_form_data_get_without_content(self): + """GET requests without data, content type and length returns no data""" + env = create_environ('/foo', 'http://example.org/', method='GET') + del env['CONTENT_TYPE'] + del env['CONTENT_LENGTH'] + + stream, form, files = formparser.parse_form_data(env) + self.assert_equal(stream.read(), '') + self.assert_equal(len(form), 0) + self.assert_equal(len(files), 0) + + def test_large_file(self): + """Test a largish file.""" + data = 'x' * (1024 * 600) + req = Request.from_values(data={'foo': (StringIO(data), 'test.txt')}, + method='POST') + # make sure we have a real file here, because we expect to be + # on the disk. > 1024 * 500 + self.assert_(isinstance(req.files['foo'].stream, file)) + + +class MultiPartTestCase(WerkzeugTestCase): + + def test_basic(self): + """Tests multipart parsing against data collected from webbrowsers""" + resources = join(dirname(__file__), 'multipart') + client = Client(form_data_consumer, Response) + + repository = [ + ('firefox3-2png1txt', '---------------------------186454651713519341951581030105', [ + (u'anchor.png', 'file1', 'image/png', 'file1.png'), + (u'application_edit.png', 'file2', 'image/png', 'file2.png') + ], u'example text'), + ('firefox3-2pnglongtext', '---------------------------14904044739787191031754711748', [ + (u'accept.png', 'file1', 'image/png', 'file1.png'), + (u'add.png', 'file2', 'image/png', 'file2.png') + ], u'--long text\r\n--with boundary\r\n--lookalikes--'), + ('opera8-2png1txt', '----------zEO9jQKmLc2Cq88c23Dx19', [ + (u'arrow_branch.png', 'file1', 'image/png', 'file1.png'), + (u'award_star_bronze_1.png', 'file2', 'image/png', 'file2.png') + ], u'blafasel öäü'), + ('webkit3-2png1txt', '----WebKitFormBoundaryjdSFhcARk8fyGNy6', [ + (u'gtk-apply.png', 'file1', 'image/png', 'file1.png'), + (u'gtk-no.png', 'file2', 'image/png', 'file2.png') + ], u'this is another text with ümläüts'), + ('ie6-2png1txt', '---------------------------7d91b03a20128', [ + (u'file1.png', 'file1', 'image/x-png', 'file1.png'), + (u'file2.png', 'file2', 'image/x-png', 'file2.png') + ], u'ie6 sucks :-/') + ] + + for name, boundary, files, text in repository: + folder = join(resources, name) + data = get_contents(join(folder, 'request.txt')) + for filename, field, content_type, fsname in files: + response = client.post('/?object=' + field, data=data, content_type= + 'multipart/form-data; boundary="%s"' % boundary, + content_length=len(data)) + lines = response.data.split('\n', 3) + self.assert_equal(lines[0], repr(filename)) + self.assert_equal(lines[1], repr(field)) + self.assert_equal(lines[2], repr(content_type)) + self.assert_equal(lines[3], get_contents(join(folder, fsname))) + response = client.post('/?object=text', data=data, content_type= + 'multipart/form-data; boundary="%s"' % boundary, + content_length=len(data)) + self.assert_equal(response.data, repr(text)) + + def test_ie7_unc_path(self): + client = Client(form_data_consumer, Response) + data_file = join(dirname(__file__), 'multipart', 'ie7_full_path_request.txt') + data = get_contents(data_file) + boundary = '---------------------------7da36d1b4a0164' + response = client.post('/?object=cb_file_upload_multiple', data=data, content_type= + 'multipart/form-data; boundary="%s"' % boundary, content_length=len(data)) + lines = response.data.split('\n', 3) + self.assert_equal(lines[0], + repr(u'Sellersburg Town Council Meeting 02-22-2010doc.doc')) + + def test_end_of_file(self): + """Test for multipart files ending unexpectedly""" + # This test looks innocent but it was actually timeing out in + # the Werkzeug 0.5 release version (#394) + data = ( + '--foo\r\n' + 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n' + 'Content-Type: text/plain\r\n\r\n' + 'file contents and no end' + ) + data = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + self.assert_(not data.files) + self.assert_(not data.form) + + def test_broken(self): + """Broken multipart does not break the applicaiton""" + data = ( + '--foo\r\n' + 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n' + 'Content-Transfer-Encoding: base64\r\n' + 'Content-Type: text/plain\r\n\r\n' + 'broken base 64' + '--foo--' + ) + _, form, files = formparser.parse_form_data(create_environ(data=data, + method='POST', content_type='multipart/form-data; boundary=foo')) + self.assert_(not files) + self.assert_(not form) + + self.assert_raises(ValueError, formparser.parse_form_data, + create_environ(data=data, method='POST', + content_type='multipart/form-data; boundary=foo'), + silent=False) + + def test_file_no_content_type(self): + """Chrome does not always provide a content type.""" + data = ( + '--foo\r\n' + 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n\r\n' + 'file contents\r\n--foo--' + ) + data = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + self.assert_equal(data.files['test'].filename, 'test.txt') + self.assert_equal(data.files['test'].read(), 'file contents') + + def test_extra_newline(self): + """Test for multipart uploads with extra newlines""" + # this test looks innocent but it was actually timeing out in + # the Werkzeug 0.5 release version (#394) + data = ( + '\r\n\r\n--foo\r\n' + 'Content-Disposition: form-data; name="foo"\r\n\r\n' + 'a string\r\n' + '--foo--' + ) + data = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + self.assert_(not data.files) + self.assert_equal(data.form['foo'], 'a string') + + def test_headers(self): + """Test access to multipart headers""" + data = ('--foo\r\n' + 'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n' + 'X-Custom-Header: blah\r\n' + 'Content-Type: text/plain; charset=utf-8\r\n\r\n' + 'file contents, just the contents\r\n' + '--foo--') + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + foo = req.files['foo'] + self.assert_equal(foo.mimetype, 'text/plain') + self.assert_equal(foo.mimetype_params, {'charset': 'utf-8'}) + self.assert_equal(foo.headers['content-type'], foo.content_type) + self.assert_equal(foo.content_type, 'text/plain; charset=utf-8') + self.assert_equal(foo.headers['x-custom-header'], 'blah') + + def test_nonstandard_line_endings(self): + """Test nonstandard line endings of multipart form data""" + for nl in '\n', '\r', '\r\n': + data = nl.join(( + '--foo', + 'Content-Disposition: form-data; name=foo', + '', + 'this is just bar', + '--foo', + 'Content-Disposition: form-data; name=bar', + '', + 'blafasel', + '--foo--' + )) + req = Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; ' + 'boundary=foo', method='POST') + self.assert_equal(req.form['foo'], 'this is just bar') + self.assert_equal(req.form['bar'], 'blafasel') + + def test_failures(self): + def parse_multipart(stream, boundary, content_length): + parser = formparser.MultiPartParser(content_length) + return parser.parse(stream, boundary, content_length) + self.assert_raises(ValueError, parse_multipart, StringIO(''), '', 0) + self.assert_raises(ValueError, parse_multipart, StringIO(''), 'broken ', 0) + + data = '--foo\r\n\r\nHello World\r\n--foo--' + self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data)) + + data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n' \ + 'Content-Transfer-Encoding: base64\r\n\r\nHello World\r\n--foo--' + self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data)) + + data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\nHello World\r\n' + self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data)) + + x = formparser.parse_multipart_headers(['foo: bar\r\n', ' x test\r\n']) + self.assert_equal(x['foo'], 'bar\n x test') + self.assert_raises(ValueError, formparser.parse_multipart_headers, + ['foo: bar\r\n', ' x test']) + + def test_bad_newline_bad_newline_assumption(self): + """Make sure we don't eat up stuff that is not a newline""" + class ISORequest(Request): + charset = 'latin1' + contents = 'U2vlbmUgbORu' + data = '--foo\r\nContent-Disposition: form-data; name="test"\r\n' \ + 'Content-Transfer-Encoding: base64\r\n\r\n' + \ + contents + '\r\n--foo--' + req = ISORequest.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + self.assert_equal(req.form['test'], u'Sk\xe5ne l\xe4n') + + +class InternalFunctionsTestCase(WerkzeugTestCase): + + def test_lien_parser(self): + assert formparser._line_parse('foo') == ('foo', False) + assert formparser._line_parse('foo\r\n') == ('foo', True) + assert formparser._line_parse('foo\r') == ('foo', True) + assert formparser._line_parse('foo\n') == ('foo', True) + + def test_find_terminator(self): + lineiter = iter('\n\n\nfoo\nbar\nbaz'.splitlines(True)) + find_terminator = formparser.MultiPartParser()._find_terminator + line = find_terminator(lineiter) + assert line == 'foo' + assert list(lineiter) == ['bar\n', 'baz'] + assert find_terminator([]) == '' + assert find_terminator(['']) == '' + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(FormParserTestCase)) + suite.addTest(unittest.makeSuite(MultiPartTestCase)) + suite.addTest(unittest.makeSuite(InternalFunctionsTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/http.py b/websdk/werkzeug/testsuite/http.py new file mode 100644 index 0000000..733aee2 --- /dev/null +++ b/websdk/werkzeug/testsuite/http.py @@ -0,0 +1,392 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.http + ~~~~~~~~~~~~~~~~~~~~~~~ + + HTTP parsing utilities. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +from datetime import datetime + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import http, datastructures +from werkzeug.test import create_environ + + +class HTTPUtilityTestCase(WerkzeugTestCase): + + def test_accept(self): + a = http.parse_accept_header('en-us,ru;q=0.5') + self.assert_equal(a.values(), ['en-us', 'ru']) + self.assert_equal(a.best, 'en-us') + self.assert_equal(a.find('ru'), 1) + self.assert_raises(ValueError, a.index, 'de') + self.assert_equal(a.to_header(), 'en-us,ru;q=0.5') + + def test_mime_accept(self): + a = http.parse_accept_header('text/xml,application/xml,' + 'application/xhtml+xml,' + 'text/html;q=0.9,text/plain;q=0.8,' + 'image/png,*/*;q=0.5', + datastructures.MIMEAccept) + self.assert_raises(ValueError, lambda: a['missing']) + self.assert_equal(a['image/png'], 1) + self.assert_equal(a['text/plain'], 0.8) + self.assert_equal(a['foo/bar'], 0.5) + self.assert_equal(a[a.find('foo/bar')], ('*/*', 0.5)) + + def test_accept_matches(self): + a = http.parse_accept_header('text/xml,application/xml,application/xhtml+xml,' + 'text/html;q=0.9,text/plain;q=0.8,' + 'image/png', datastructures.MIMEAccept) + self.assert_equal(a.best_match(['text/html', 'application/xhtml+xml']), + 'application/xhtml+xml') + self.assert_equal(a.best_match(['text/html']), 'text/html') + self.assert_(a.best_match(['foo/bar']) is None) + self.assert_equal(a.best_match(['foo/bar', 'bar/foo'], + default='foo/bar'), 'foo/bar') + self.assert_equal(a.best_match(['application/xml', 'text/xml']), 'application/xml') + + def test_charset_accept(self): + a = http.parse_accept_header('ISO-8859-1,utf-8;q=0.7,*;q=0.7', + datastructures.CharsetAccept) + self.assert_equal(a['iso-8859-1'], a['iso8859-1']) + self.assert_equal(a['iso-8859-1'], 1) + self.assert_equal(a['UTF8'], 0.7) + self.assert_equal(a['ebcdic'], 0.7) + + def test_language_accept(self): + a = http.parse_accept_header('de-AT,de;q=0.8,en;q=0.5', + datastructures.LanguageAccept) + self.assert_equal(a.best, 'de-AT') + self.assert_('de_AT' in a) + self.assert_('en' in a) + self.assert_equal(a['de-at'], 1) + self.assert_equal(a['en'], 0.5) + + def test_set_header(self): + hs = http.parse_set_header('foo, Bar, "Blah baz", Hehe') + self.assert_('blah baz' in hs) + self.assert_('foobar' not in hs) + self.assert_('foo' in hs) + self.assert_equal(list(hs), ['foo', 'Bar', 'Blah baz', 'Hehe']) + hs.add('Foo') + self.assert_equal(hs.to_header(), 'foo, Bar, "Blah baz", Hehe') + + def test_list_header(self): + hl = http.parse_list_header('foo baz, blah') + self.assert_equal(hl, ['foo baz', 'blah']) + + def test_dict_header(self): + d = http.parse_dict_header('foo="bar baz", blah=42') + self.assert_equal(d, {'foo': 'bar baz', 'blah': '42'}) + + def test_cache_control_header(self): + cc = http.parse_cache_control_header('max-age=0, no-cache') + assert cc.max_age == 0 + assert cc.no_cache + cc = http.parse_cache_control_header('private, community="UCI"', None, + datastructures.ResponseCacheControl) + assert cc.private + assert cc['community'] == 'UCI' + + c = datastructures.ResponseCacheControl() + assert c.no_cache is None + assert c.private is None + c.no_cache = True + assert c.no_cache == '*' + c.private = True + assert c.private == '*' + del c.private + assert c.private is None + assert c.to_header() == 'no-cache' + + def test_authorization_header(self): + a = http.parse_authorization_header('Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==') + assert a.type == 'basic' + assert a.username == 'Aladdin' + assert a.password == 'open sesame' + + a = http.parse_authorization_header('''Digest username="Mufasa", + realm="testrealm@host.invalid", + nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", + uri="/dir/index.html", + qop=auth, + nc=00000001, + cnonce="0a4f113b", + response="6629fae49393a05397450978507c4ef1", + opaque="5ccc069c403ebaf9f0171e9517f40e41"''') + assert a.type == 'digest' + assert a.username == 'Mufasa' + assert a.realm == 'testrealm@host.invalid' + assert a.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093' + assert a.uri == '/dir/index.html' + assert 'auth' in a.qop + assert a.nc == '00000001' + assert a.cnonce == '0a4f113b' + assert a.response == '6629fae49393a05397450978507c4ef1' + assert a.opaque == '5ccc069c403ebaf9f0171e9517f40e41' + + a = http.parse_authorization_header('''Digest username="Mufasa", + realm="testrealm@host.invalid", + nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", + uri="/dir/index.html", + response="e257afa1414a3340d93d30955171dd0e", + opaque="5ccc069c403ebaf9f0171e9517f40e41"''') + assert a.type == 'digest' + assert a.username == 'Mufasa' + assert a.realm == 'testrealm@host.invalid' + assert a.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093' + assert a.uri == '/dir/index.html' + assert a.response == 'e257afa1414a3340d93d30955171dd0e' + assert a.opaque == '5ccc069c403ebaf9f0171e9517f40e41' + + assert http.parse_authorization_header('') is None + assert http.parse_authorization_header(None) is None + assert http.parse_authorization_header('foo') is None + + def test_www_authenticate_header(self): + wa = http.parse_www_authenticate_header('Basic realm="WallyWorld"') + assert wa.type == 'basic' + assert wa.realm == 'WallyWorld' + wa.realm = 'Foo Bar' + assert wa.to_header() == 'Basic realm="Foo Bar"' + + wa = http.parse_www_authenticate_header('''Digest + realm="testrealm@host.com", + qop="auth,auth-int", + nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", + opaque="5ccc069c403ebaf9f0171e9517f40e41"''') + assert wa.type == 'digest' + assert wa.realm == 'testrealm@host.com' + assert 'auth' in wa.qop + assert 'auth-int' in wa.qop + assert wa.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093' + assert wa.opaque == '5ccc069c403ebaf9f0171e9517f40e41' + + wa = http.parse_www_authenticate_header('broken') + assert wa.type == 'broken' + + assert not http.parse_www_authenticate_header('').type + assert not http.parse_www_authenticate_header('') + + def test_etags(self): + assert http.quote_etag('foo') == '"foo"' + assert http.quote_etag('foo', True) == 'w/"foo"' + assert http.unquote_etag('"foo"') == ('foo', False) + assert http.unquote_etag('w/"foo"') == ('foo', True) + es = http.parse_etags('"foo", "bar", w/"baz", blar') + assert sorted(es) == ['bar', 'blar', 'foo'] + assert 'foo' in es + assert 'baz' not in es + assert es.contains_weak('baz') + assert 'blar' in es + assert es.contains_raw('w/"baz"') + assert es.contains_raw('"foo"') + assert sorted(es.to_header().split(', ')) == ['"bar"', '"blar"', '"foo"', 'w/"baz"'] + + def test_parse_date(self): + assert http.parse_date('Sun, 06 Nov 1994 08:49:37 GMT ') == datetime(1994, 11, 6, 8, 49, 37) + assert http.parse_date('Sunday, 06-Nov-94 08:49:37 GMT') == datetime(1994, 11, 6, 8, 49, 37) + assert http.parse_date(' Sun Nov 6 08:49:37 1994') == datetime(1994, 11, 6, 8, 49, 37) + assert http.parse_date('foo') is None + + def test_parse_date_overflows(self): + assert http.parse_date(' Sun 02 Feb 1343 08:49:37 GMT') == datetime(1343, 2, 2, 8, 49, 37) + assert http.parse_date('Thu, 01 Jan 1970 00:00:00 GMT') == datetime(1970, 1, 1, 0, 0) + assert http.parse_date('Thu, 33 Jan 1970 00:00:00 GMT') is None + + def test_remove_entity_headers(self): + now = http.http_date() + headers1 = [('Date', now), ('Content-Type', 'text/html'), ('Content-Length', '0')] + headers2 = datastructures.Headers(headers1) + + http.remove_entity_headers(headers1) + assert headers1 == [('Date', now)] + + http.remove_entity_headers(headers2) + assert headers2 == datastructures.Headers([('Date', now)]) + + def test_remove_hop_by_hop_headers(self): + headers1 = [('Connection', 'closed'), ('Foo', 'bar'), + ('Keep-Alive', 'wtf')] + headers2 = datastructures.Headers(headers1) + + http.remove_hop_by_hop_headers(headers1) + assert headers1 == [('Foo', 'bar')] + + http.remove_hop_by_hop_headers(headers2) + assert headers2 == datastructures.Headers([('Foo', 'bar')]) + + def test_parse_options_header(self): + assert http.parse_options_header('something; foo="other\"thing"') == \ + ('something', {'foo': 'other"thing'}) + assert http.parse_options_header('something; foo="other\"thing"; meh=42') == \ + ('something', {'foo': 'other"thing', 'meh': '42'}) + assert http.parse_options_header('something; foo="other\"thing"; meh=42; bleh') == \ + ('something', {'foo': 'other"thing', 'meh': '42', 'bleh': None}) + + def test_dump_options_header(self): + assert http.dump_options_header('foo', {'bar': 42}) == \ + 'foo; bar=42' + assert http.dump_options_header('foo', {'bar': 42, 'fizz': None}) == \ + 'foo; bar=42; fizz' + + def test_dump_header(self): + assert http.dump_header([1, 2, 3]) == '1, 2, 3' + assert http.dump_header([1, 2, 3], allow_token=False) == '"1", "2", "3"' + assert http.dump_header({'foo': 'bar'}, allow_token=False) == 'foo="bar"' + assert http.dump_header({'foo': 'bar'}) == 'foo=bar' + + def test_is_resource_modified(self): + env = create_environ() + + # ignore POST + env['REQUEST_METHOD'] = 'POST' + assert not http.is_resource_modified(env, etag='testing') + env['REQUEST_METHOD'] = 'GET' + + # etagify from data + self.assert_raises(TypeError, http.is_resource_modified, env, + data='42', etag='23') + env['HTTP_IF_NONE_MATCH'] = http.generate_etag('awesome') + assert not http.is_resource_modified(env, data='awesome') + + env['HTTP_IF_MODIFIED_SINCE'] = http.http_date(datetime(2008, 1, 1, 12, 30)) + assert not http.is_resource_modified(env, + last_modified=datetime(2008, 1, 1, 12, 00)) + assert http.is_resource_modified(env, + last_modified=datetime(2008, 1, 1, 13, 00)) + + def test_date_formatting(self): + assert http.cookie_date(0) == 'Thu, 01-Jan-1970 00:00:00 GMT' + assert http.cookie_date(datetime(1970, 1, 1)) == 'Thu, 01-Jan-1970 00:00:00 GMT' + assert http.http_date(0) == 'Thu, 01 Jan 1970 00:00:00 GMT' + assert http.http_date(datetime(1970, 1, 1)) == 'Thu, 01 Jan 1970 00:00:00 GMT' + + def test_cookies(self): + assert http.parse_cookie('dismiss-top=6; CP=null*; PHPSESSID=0a539d42abc001cd' + 'c762809248d4beed; a=42') == { + 'CP': u'null*', + 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed', + 'a': u'42', + 'dismiss-top': u'6' + } + assert set(http.dump_cookie('foo', 'bar baz blub', 360, httponly=True, + sync_expires=False).split('; ')) == \ + set(['HttpOnly', 'Max-Age=360', 'Path=/', 'foo="bar baz blub"']) + assert http.parse_cookie('fo234{=bar blub=Blah') == {'blub': 'Blah'} + + def test_cookie_quoting(self): + val = http.dump_cookie("foo", "?foo") + assert val == 'foo="?foo"; Path=/' + assert http.parse_cookie(val) == {'foo': '?foo'} + + assert http.parse_cookie(r'foo="foo\054bar"') == {'foo': 'foo,bar'} + + +class RangeTestCase(WerkzeugTestCase): + + def test_if_range_parsing(self): + rv = http.parse_if_range_header('"Test"') + assert rv.etag == 'Test' + assert rv.date is None + assert rv.to_header() == '"Test"' + + # weak information is dropped + rv = http.parse_if_range_header('w/"Test"') + assert rv.etag == 'Test' + assert rv.date is None + assert rv.to_header() == '"Test"' + + # broken etags are supported too + rv = http.parse_if_range_header('bullshit') + assert rv.etag == 'bullshit' + assert rv.date is None + assert rv.to_header() == '"bullshit"' + + rv = http.parse_if_range_header('Thu, 01 Jan 1970 00:00:00 GMT') + assert rv.etag is None + assert rv.date == datetime(1970, 1, 1) + assert rv.to_header() == 'Thu, 01 Jan 1970 00:00:00 GMT' + + for x in '', None: + rv = http.parse_if_range_header(x) + assert rv.etag is None + assert rv.date is None + assert rv.to_header() == '' + + def test_range_parsing(): + rv = http.parse_range_header('bytes=52') + assert rv is None + + rv = http.parse_range_header('bytes=52-') + assert rv.units == 'bytes' + assert rv.ranges == [(52, None)] + assert rv.to_header() == 'bytes=52-' + + rv = http.parse_range_header('bytes=52-99') + assert rv.units == 'bytes' + assert rv.ranges == [(52, 100)] + assert rv.to_header() == 'bytes=52-99' + + rv = http.parse_range_header('bytes=52-99,-1000') + assert rv.units == 'bytes' + assert rv.ranges == [(52, 100), (-1000, None)] + assert rv.to_header() == 'bytes=52-99,-1000' + + rv = http.parse_range_header('bytes = 1 - 100') + assert rv.units == 'bytes' + assert rv.ranges == [(1, 101)] + assert rv.to_header() == 'bytes=1-100' + + rv = http.parse_range_header('AWesomes=0-999') + assert rv.units == 'awesomes' + assert rv.ranges == [(0, 1000)] + assert rv.to_header() == 'awesomes=0-999' + + def test_content_range_parsing(): + rv = http.parse_content_range_header('bytes 0-98/*') + assert rv.units == 'bytes' + assert rv.start == 0 + assert rv.stop == 99 + assert rv.length is None + assert rv.to_header() == 'bytes 0-98/*' + + rv = http.parse_content_range_header('bytes 0-98/*asdfsa') + assert rv is None + + rv = http.parse_content_range_header('bytes 0-99/100') + assert rv.to_header() == 'bytes 0-99/100' + rv.start = None + rv.stop = None + assert rv.units == 'bytes' + assert rv.to_header() == 'bytes */100' + + rv = http.parse_content_range_header('bytes */100') + assert rv.start is None + assert rv.stop is None + assert rv.length == 100 + assert rv.units == 'bytes' + + +class RegressionTestCase(WerkzeugTestCase): + + def test_best_match_works(self): + # was a bug in 0.6 + rv = http.parse_accept_header('foo=,application/xml,application/xhtml+xml,' + 'text/html;q=0.9,text/plain;q=0.8,' + 'image/png,*/*;q=0.5', + datastructures.MIMEAccept).best_match(['foo/bar']) + self.assert_equal(rv, 'foo/bar') + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(HTTPUtilityTestCase)) + suite.addTest(unittest.makeSuite(RegressionTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/internal.py b/websdk/werkzeug/testsuite/internal.py new file mode 100644 index 0000000..1de1b5d --- /dev/null +++ b/websdk/werkzeug/testsuite/internal.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.internal + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Internal tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from datetime import datetime +from warnings import filterwarnings, resetwarnings + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.wrappers import Request, Response + +from werkzeug import _internal as internal +from werkzeug.test import create_environ + + +class InternalTestCase(WerkzeugTestCase): + + def test_date_to_unix(self): + assert internal._date_to_unix(datetime(1970, 1, 1)) == 0 + assert internal._date_to_unix(datetime(1970, 1, 1, 1, 0, 0)) == 3600 + assert internal._date_to_unix(datetime(1970, 1, 1, 1, 1, 1)) == 3661 + x = datetime(2010, 2, 15, 16, 15, 39) + assert internal._date_to_unix(x) == 1266250539 + + def test_easteregg(self): + req = Request.from_values('/?macgybarchakku') + resp = Response.force_type(internal._easteregg(None), req) + assert 'About Werkzeug' in resp.data + assert 'the Swiss Army knife of Python web development' in resp.data + + def test_wrapper_internals(self): + req = Request.from_values(data={'foo': 'bar'}, method='POST') + req._load_form_data() + assert req.form.to_dict() == {'foo': 'bar'} + + # second call does not break + req._load_form_data() + assert req.form.to_dict() == {'foo': 'bar'} + + # check reprs + assert repr(req) == "<Request 'http://localhost/' [POST]>" + resp = Response() + assert repr(resp) == '<Response 0 bytes [200 OK]>' + resp.data = 'Hello World!' + assert repr(resp) == '<Response 12 bytes [200 OK]>' + resp.response = iter(['Test']) + assert repr(resp) == '<Response streamed [200 OK]>' + + # unicode data does not set content length + response = Response([u'Hällo Wörld']) + headers = response.get_wsgi_headers(create_environ()) + assert 'Content-Length' not in headers + + response = Response(['Hällo Wörld']) + headers = response.get_wsgi_headers(create_environ()) + assert 'Content-Length' in headers + + # check for internal warnings + filterwarnings('error', category=Warning) + response = Response() + environ = create_environ() + response.response = 'What the...?' + self.assert_raises(Warning, lambda: list(response.iter_encoded())) + self.assert_raises(Warning, lambda: list(response.get_app_iter(environ))) + response.direct_passthrough = True + self.assert_raises(Warning, lambda: list(response.iter_encoded())) + self.assert_raises(Warning, lambda: list(response.get_app_iter(environ))) + resetwarnings() + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(InternalTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/local.py b/websdk/werkzeug/testsuite/local.py new file mode 100644 index 0000000..236e753 --- /dev/null +++ b/websdk/werkzeug/testsuite/local.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.local + ~~~~~~~~~~~~~~~~~~~~~~~~ + + Local and local proxy tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import time +import unittest +from threading import Thread + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import local + + +class LocalTestCase(WerkzeugTestCase): + + def test_basic_local(self): + l = local.Local() + l.foo = 0 + values = [] + def value_setter(idx): + time.sleep(0.01 * idx) + l.foo = idx + time.sleep(0.02) + values.append(l.foo) + threads = [Thread(target=value_setter, args=(x,)) + for x in [1, 2, 3]] + for thread in threads: + thread.start() + time.sleep(0.2) + assert sorted(values) == [1, 2, 3] + + def delfoo(): + del l.foo + delfoo() + self.assert_raises(AttributeError, lambda: l.foo) + self.assert_raises(AttributeError, delfoo) + + local.release_local(l) + + def test_local_release(self): + loc = local.Local() + loc.foo = 42 + local.release_local(loc) + assert not hasattr(loc, 'foo') + + ls = local.LocalStack() + ls.push(42) + local.release_local(ls) + assert ls.top is None + + def test_local_proxy(self): + foo = [] + ls = local.LocalProxy(lambda: foo) + ls.append(42) + ls.append(23) + ls[1:] = [1, 2, 3] + assert foo == [42, 1, 2, 3] + assert repr(foo) == repr(ls) + assert foo[0] == 42 + foo += [1] + assert list(foo) == [42, 1, 2, 3, 1] + + def test_local_stack(self): + ident = local.get_ident() + + ls = local.LocalStack() + assert ident not in ls._local.__storage__ + assert ls.top is None + ls.push(42) + assert ident in ls._local.__storage__ + assert ls.top == 42 + ls.push(23) + assert ls.top == 23 + ls.pop() + assert ls.top == 42 + ls.pop() + assert ls.top is None + assert ls.pop() is None + assert ls.pop() is None + + proxy = ls() + ls.push([1, 2]) + assert proxy == [1, 2] + ls.push((1, 2)) + assert proxy == (1, 2) + ls.pop() + ls.pop() + assert repr(proxy) == '<LocalProxy unbound>' + + assert ident not in ls._local.__storage__ + + def test_local_proxies_with_callables(self): + foo = 42 + ls = local.LocalProxy(lambda: foo) + assert ls == 42 + foo = [23] + ls.append(42) + assert ls == [23, 42] + assert foo == [23, 42] + + def test_custom_idents(self): + ident = 0 + loc = local.Local() + stack = local.LocalStack() + mgr = local.LocalManager([loc, stack], ident_func=lambda: ident) + + loc.foo = 42 + stack.push({'foo': 42}) + ident = 1 + loc.foo = 23 + stack.push({'foo': 23}) + ident = 0 + assert loc.foo == 42 + assert stack.top['foo'] == 42 + stack.pop() + assert stack.top is None + ident = 1 + assert loc.foo == 23 + assert stack.top['foo'] == 23 + stack.pop() + assert stack.top is None + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(LocalTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/multipart/collect.py b/websdk/werkzeug/testsuite/multipart/collect.py new file mode 100644 index 0000000..5fce5e9 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/collect.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +""" +Hacky helper application to collect form data. +""" +from werkzeug.serving import run_simple +from werkzeug.wrappers import Request, Response + + +def copy_stream(request): + from os import mkdir + from time import time + folder = 'request-%d' % time() + mkdir(folder) + environ = request.environ + f = file(folder + '/request.txt', 'wb+') + f.write(environ['wsgi.input'].read(int(environ['CONTENT_LENGTH']))) + f.flush() + f.seek(0) + environ['wsgi.input'] = f + request.stat_folder = folder + + +def stats(request): + copy_stream(request) + f1 = request.files['file1'] + f2 = request.files['file2'] + text = request.form['text'] + f1.save(request.stat_folder + '/file1.bin') + f2.save(request.stat_folder + '/file2.bin') + file(request.stat_folder + '/text.txt', 'w').write(text.encode('utf-8')) + return Response('Done.') + + +def upload_file(request): + return Response(''' + <h1>Upload File</h1> + <form action="" method="post" enctype="multipart/form-data"> + <input type="file" name="file1"><br> + <input type="file" name="file2"><br> + <textarea name="text"></textarea><br> + <input type="submit" value="Send"> + </form> + ''', mimetype='text/html') + + +def application(environ, start_responseonse): + request = Request(environ) + if request.method == 'POST': + response = stats(request) + else: + response = upload_file(request) + return response(environ, start_responseonse) + + +if __name__ == '__main__': + run_simple('localhost', 5000, application, use_debugger=True) diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png Binary files differnew file mode 100644 index 0000000..9b3422c --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png Binary files differnew file mode 100644 index 0000000..fb2efb8 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt Binary files differnew file mode 100644 index 0000000..721e04e --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt new file mode 100644 index 0000000..c87634d --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt @@ -0,0 +1 @@ +example text
\ No newline at end of file diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png Binary files differnew file mode 100644 index 0000000..89c8129 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png Binary files differnew file mode 100644 index 0000000..6332fef --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt Binary files differnew file mode 100644 index 0000000..489290b --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt new file mode 100644 index 0000000..3bf804d --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt @@ -0,0 +1,3 @@ +--long text +--with boundary +--lookalikes--
\ No newline at end of file diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png Binary files differnew file mode 100644 index 0000000..9b3422c --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png Binary files differnew file mode 100644 index 0000000..fb2efb8 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt Binary files differnew file mode 100644 index 0000000..59fdeae --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt new file mode 100644 index 0000000..7c465b7 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt @@ -0,0 +1 @@ +ie6 sucks :-/
\ No newline at end of file diff --git a/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt b/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt Binary files differnew file mode 100644 index 0000000..acc4e2e --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png Binary files differnew file mode 100644 index 0000000..7542db1 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png Binary files differnew file mode 100644 index 0000000..658c711 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt Binary files differnew file mode 100644 index 0000000..8f32591 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt new file mode 100644 index 0000000..ca01cb0 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt @@ -0,0 +1 @@ +blafasel öäü
\ No newline at end of file diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png Binary files differnew file mode 100644 index 0000000..afca073 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png Binary files differnew file mode 100644 index 0000000..2a7da6e --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt Binary files differnew file mode 100644 index 0000000..b4ce0ee --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt new file mode 100644 index 0000000..baa1300 --- /dev/null +++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt @@ -0,0 +1 @@ +this is another text with ümläüts
\ No newline at end of file diff --git a/websdk/werkzeug/testsuite/res/test.txt b/websdk/werkzeug/testsuite/res/test.txt new file mode 100644 index 0000000..a8efdcc --- /dev/null +++ b/websdk/werkzeug/testsuite/res/test.txt @@ -0,0 +1 @@ +FOUND diff --git a/websdk/werkzeug/testsuite/routing.py b/websdk/werkzeug/testsuite/routing.py new file mode 100644 index 0000000..77b3cb6 --- /dev/null +++ b/websdk/werkzeug/testsuite/routing.py @@ -0,0 +1,610 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.routing + ~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Routing tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import routing as r +from werkzeug.wrappers import Response +from werkzeug.datastructures import ImmutableDict +from werkzeug.test import create_environ + + +class RoutingTestCase(WerkzeugTestCase): + + def test_basic_routing(self): + map = r.Map([ + r.Rule('/', endpoint='index'), + r.Rule('/foo', endpoint='foo'), + r.Rule('/bar/', endpoint='bar') + ]) + adapter = map.bind('example.org', '/') + assert adapter.match('/') == ('index', {}) + assert adapter.match('/foo') == ('foo', {}) + assert adapter.match('/bar/') == ('bar', {}) + self.assert_raises(r.RequestRedirect, lambda: adapter.match('/bar')) + self.assert_raises(r.NotFound, lambda: adapter.match('/blub')) + + adapter = map.bind('example.org', '/test') + try: + adapter.match('/bar') + except r.RequestRedirect, e: + assert e.new_url == 'http://example.org/test/bar/' + else: + self.fail('Expected request redirect') + + adapter = map.bind('example.org', '/') + try: + adapter.match('/bar') + except r.RequestRedirect, e: + assert e.new_url == 'http://example.org/bar/' + else: + self.fail('Expected request redirect') + + adapter = map.bind('example.org', '/') + try: + adapter.match('/bar', query_args={'aha': 'muhaha'}) + except r.RequestRedirect, e: + assert e.new_url == 'http://example.org/bar/?aha=muhaha' + else: + self.fail('Expected request redirect') + + adapter = map.bind('example.org', '/') + try: + adapter.match('/bar', query_args='aha=muhaha') + except r.RequestRedirect, e: + assert e.new_url == 'http://example.org/bar/?aha=muhaha' + else: + self.fail('Expected request redirect') + + adapter = map.bind_to_environ(create_environ('/bar?foo=bar', + 'http://example.org/')) + try: + adapter.match() + except r.RequestRedirect, e: + assert e.new_url == 'http://example.org/bar/?foo=bar' + else: + self.fail('Expected request redirect') + + def test_environ_defaults(self): + environ = create_environ("/foo") + self.assert_equal(environ["PATH_INFO"], '/foo') + m = r.Map([r.Rule("/foo", endpoint="foo"), r.Rule("/bar", endpoint="bar")]) + a = m.bind_to_environ(environ) + self.assert_equal(a.match("/foo"), ('foo', {})) + self.assert_equal(a.match(), ('foo', {})) + self.assert_equal(a.match("/bar"), ('bar', {})) + self.assert_raises(r.NotFound, a.match, "/bars") + + def test_basic_building(self): + map = r.Map([ + r.Rule('/', endpoint='index'), + r.Rule('/foo', endpoint='foo'), + r.Rule('/bar/<baz>', endpoint='bar'), + r.Rule('/bar/<int:bazi>', endpoint='bari'), + r.Rule('/bar/<float:bazf>', endpoint='barf'), + r.Rule('/bar/<path:bazp>', endpoint='barp'), + r.Rule('/hehe', endpoint='blah', subdomain='blah') + ]) + adapter = map.bind('example.org', '/', subdomain='blah') + + assert adapter.build('index', {}) == 'http://example.org/' + assert adapter.build('foo', {}) == 'http://example.org/foo' + assert adapter.build('bar', {'baz': 'blub'}) == 'http://example.org/bar/blub' + assert adapter.build('bari', {'bazi': 50}) == 'http://example.org/bar/50' + assert adapter.build('barf', {'bazf': 0.815}) == 'http://example.org/bar/0.815' + assert adapter.build('barp', {'bazp': 'la/di'}) == 'http://example.org/bar/la/di' + assert adapter.build('blah', {}) == '/hehe' + self.assert_raises(r.BuildError, lambda: adapter.build('urks')) + + adapter = map.bind('example.org', '/test', subdomain='blah') + assert adapter.build('index', {}) == 'http://example.org/test/' + assert adapter.build('foo', {}) == 'http://example.org/test/foo' + assert adapter.build('bar', {'baz': 'blub'}) == 'http://example.org/test/bar/blub' + assert adapter.build('bari', {'bazi': 50}) == 'http://example.org/test/bar/50' + assert adapter.build('barf', {'bazf': 0.815}) == 'http://example.org/test/bar/0.815' + assert adapter.build('barp', {'bazp': 'la/di'}) == 'http://example.org/test/bar/la/di' + assert adapter.build('blah', {}) == '/test/hehe' + + def test_defaults(self): + map = r.Map([ + r.Rule('/foo/', defaults={'page': 1}, endpoint='foo'), + r.Rule('/foo/<int:page>', endpoint='foo') + ]) + adapter = map.bind('example.org', '/') + + assert adapter.match('/foo/') == ('foo', {'page': 1}) + self.assert_raises(r.RequestRedirect, lambda: adapter.match('/foo/1')) + assert adapter.match('/foo/2') == ('foo', {'page': 2}) + assert adapter.build('foo', {}) == '/foo/' + assert adapter.build('foo', {'page': 1}) == '/foo/' + assert adapter.build('foo', {'page': 2}) == '/foo/2' + + def test_greedy(self): + map = r.Map([ + r.Rule('/foo', endpoint='foo'), + r.Rule('/<path:bar>', endpoint='bar'), + r.Rule('/<path:bar>/<path:blub>', endpoint='bar') + ]) + adapter = map.bind('example.org', '/') + + assert adapter.match('/foo') == ('foo', {}) + assert adapter.match('/blub') == ('bar', {'bar': 'blub'}) + assert adapter.match('/he/he') == ('bar', {'bar': 'he', 'blub': 'he'}) + + assert adapter.build('foo', {}) == '/foo' + assert adapter.build('bar', {'bar': 'blub'}) == '/blub' + assert adapter.build('bar', {'bar': 'blub', 'blub': 'bar'}) == '/blub/bar' + + def test_path(self): + map = r.Map([ + r.Rule('/', defaults={'name': 'FrontPage'}, endpoint='page'), + r.Rule('/Special', endpoint='special'), + r.Rule('/<int:year>', endpoint='year'), + r.Rule('/<path:name>', endpoint='page'), + r.Rule('/<path:name>/edit', endpoint='editpage'), + r.Rule('/<path:name>/silly/<path:name2>', endpoint='sillypage'), + r.Rule('/<path:name>/silly/<path:name2>/edit', endpoint='editsillypage'), + r.Rule('/Talk:<path:name>', endpoint='talk'), + r.Rule('/User:<username>', endpoint='user'), + r.Rule('/User:<username>/<path:name>', endpoint='userpage'), + r.Rule('/Files/<path:file>', endpoint='files'), + ]) + adapter = map.bind('example.org', '/') + + assert adapter.match('/') == ('page', {'name':'FrontPage'}) + self.assert_raises(r.RequestRedirect, lambda: adapter.match('/FrontPage')) + assert adapter.match('/Special') == ('special', {}) + assert adapter.match('/2007') == ('year', {'year':2007}) + assert adapter.match('/Some/Page') == ('page', {'name':'Some/Page'}) + assert adapter.match('/Some/Page/edit') == ('editpage', {'name':'Some/Page'}) + assert adapter.match('/Foo/silly/bar') == ('sillypage', {'name':'Foo', 'name2':'bar'}) + assert adapter.match('/Foo/silly/bar/edit') == ('editsillypage', {'name':'Foo', 'name2':'bar'}) + assert adapter.match('/Talk:Foo/Bar') == ('talk', {'name':'Foo/Bar'}) + assert adapter.match('/User:thomas') == ('user', {'username':'thomas'}) + assert adapter.match('/User:thomas/projects/werkzeug') == \ + ('userpage', {'username':'thomas', 'name':'projects/werkzeug'}) + assert adapter.match('/Files/downloads/werkzeug/0.2.zip') == \ + ('files', {'file':'downloads/werkzeug/0.2.zip'}) + + def test_dispatch(self): + env = create_environ('/') + map = r.Map([ + r.Rule('/', endpoint='root'), + r.Rule('/foo/', endpoint='foo') + ]) + adapter = map.bind_to_environ(env) + + raise_this = None + def view_func(endpoint, values): + if raise_this is not None: + raise raise_this + return Response(repr((endpoint, values))) + dispatch = lambda p, q=False: Response.force_type(adapter.dispatch(view_func, p, + catch_http_exceptions=q), env) + + assert dispatch('/').data == "('root', {})" + assert dispatch('/foo').status_code == 301 + raise_this = r.NotFound() + self.assert_raises(r.NotFound, lambda: dispatch('/bar')) + assert dispatch('/bar', True).status_code == 404 + + def test_http_host_before_server_name(self): + env = { + 'HTTP_HOST': 'wiki.example.com', + 'SERVER_NAME': 'web0.example.com', + 'SERVER_PORT': '80', + 'SCRIPT_NAME': '', + 'PATH_INFO': '', + 'REQUEST_METHOD': 'GET', + 'wsgi.url_scheme': 'http' + } + map = r.Map([r.Rule('/', endpoint='index', subdomain='wiki')]) + adapter = map.bind_to_environ(env, server_name='example.com') + assert adapter.match('/') == ('index', {}) + assert adapter.build('index', force_external=True) == 'http://wiki.example.com/' + assert adapter.build('index') == '/' + + env['HTTP_HOST'] = 'admin.example.com' + adapter = map.bind_to_environ(env, server_name='example.com') + assert adapter.build('index') == 'http://wiki.example.com/' + + def test_adapter_url_parameter_sorting(self): + map = r.Map([r.Rule('/', endpoint='index')], sort_parameters=True, + sort_key=lambda x: x[1]) + adapter = map.bind('localhost', '/') + assert adapter.build('index', {'x': 20, 'y': 10, 'z': 30}, + force_external=True) == 'http://localhost/?y=10&x=20&z=30' + + def test_request_direct_charset_bug(self): + map = r.Map([r.Rule(u'/öäü/')]) + adapter = map.bind('localhost', '/') + try: + adapter.match(u'/öäü') + except r.RequestRedirect, e: + assert e.new_url == 'http://localhost/%C3%B6%C3%A4%C3%BC/' + else: + self.fail('expected request redirect exception') + + def test_request_redirect_default(self): + map = r.Map([r.Rule(u'/foo', defaults={'bar': 42}), + r.Rule(u'/foo/<int:bar>')]) + adapter = map.bind('localhost', '/') + try: + adapter.match(u'/foo/42') + except r.RequestRedirect, e: + assert e.new_url == 'http://localhost/foo' + else: + self.fail('expected request redirect exception') + + def test_request_redirect_default_subdomain(self): + map = r.Map([r.Rule(u'/foo', defaults={'bar': 42}, subdomain='test'), + r.Rule(u'/foo/<int:bar>', subdomain='other')]) + adapter = map.bind('localhost', '/', subdomain='other') + try: + adapter.match(u'/foo/42') + except r.RequestRedirect, e: + assert e.new_url == 'http://test.localhost/foo' + else: + self.fail('expected request redirect exception') + + def test_adapter_match_return_rule(self): + rule = r.Rule('/foo/', endpoint='foo') + map = r.Map([rule]) + adapter = map.bind('localhost', '/') + assert adapter.match('/foo/', return_rule=True) == (rule, {}) + + def test_server_name_interpolation(self): + server_name = 'example.invalid' + map = r.Map([r.Rule('/', endpoint='index'), + r.Rule('/', endpoint='alt', subdomain='alt')]) + + env = create_environ('/', 'http://%s/' % server_name) + adapter = map.bind_to_environ(env, server_name=server_name) + assert adapter.match() == ('index', {}) + + env = create_environ('/', 'http://alt.%s/' % server_name) + adapter = map.bind_to_environ(env, server_name=server_name) + assert adapter.match() == ('alt', {}) + + env = create_environ('/', 'http://%s/' % server_name) + adapter = map.bind_to_environ(env, server_name='foo') + assert adapter.subdomain == '<invalid>' + + def test_rule_emptying(self): + rule = r.Rule('/foo', {'meh': 'muh'}, 'x', ['POST'], + False, 'x', True, None) + rule2 = rule.empty() + assert rule.__dict__ == rule2.__dict__ + rule.methods.add('GET') + assert rule.__dict__ != rule2.__dict__ + rule.methods.discard('GET') + rule.defaults['meh'] = 'aha' + assert rule.__dict__ != rule2.__dict__ + + def test_rule_templates(self): + testcase = r.RuleTemplate( + [ r.Submount('/test/$app', + [ r.Rule('/foo/', endpoint='handle_foo') + , r.Rule('/bar/', endpoint='handle_bar') + , r.Rule('/baz/', endpoint='handle_baz') + ]), + r.EndpointPrefix('${app}', + [ r.Rule('/${app}-blah', endpoint='bar') + , r.Rule('/${app}-meh', endpoint='baz') + ]), + r.Subdomain('$app', + [ r.Rule('/blah', endpoint='x_bar') + , r.Rule('/meh', endpoint='x_baz') + ]) + ]) + + url_map = r.Map( + [ testcase(app='test1') + , testcase(app='test2') + , testcase(app='test3') + , testcase(app='test4') + ]) + + out = sorted([(x.rule, x.subdomain, x.endpoint) + for x in url_map.iter_rules()]) + + assert out == ([ + ('/blah', 'test1', 'x_bar'), + ('/blah', 'test2', 'x_bar'), + ('/blah', 'test3', 'x_bar'), + ('/blah', 'test4', 'x_bar'), + ('/meh', 'test1', 'x_baz'), + ('/meh', 'test2', 'x_baz'), + ('/meh', 'test3', 'x_baz'), + ('/meh', 'test4', 'x_baz'), + ('/test/test1/bar/', '', 'handle_bar'), + ('/test/test1/baz/', '', 'handle_baz'), + ('/test/test1/foo/', '', 'handle_foo'), + ('/test/test2/bar/', '', 'handle_bar'), + ('/test/test2/baz/', '', 'handle_baz'), + ('/test/test2/foo/', '', 'handle_foo'), + ('/test/test3/bar/', '', 'handle_bar'), + ('/test/test3/baz/', '', 'handle_baz'), + ('/test/test3/foo/', '', 'handle_foo'), + ('/test/test4/bar/', '', 'handle_bar'), + ('/test/test4/baz/', '', 'handle_baz'), + ('/test/test4/foo/', '', 'handle_foo'), + ('/test1-blah', '', 'test1bar'), + ('/test1-meh', '', 'test1baz'), + ('/test2-blah', '', 'test2bar'), + ('/test2-meh', '', 'test2baz'), + ('/test3-blah', '', 'test3bar'), + ('/test3-meh', '', 'test3baz'), + ('/test4-blah', '', 'test4bar'), + ('/test4-meh', '', 'test4baz') + ]) + + def test_complex_routing_rules(self): + m = r.Map([ + r.Rule('/', endpoint='index'), + r.Rule('/<int:blub>', endpoint='an_int'), + r.Rule('/<blub>', endpoint='a_string'), + r.Rule('/foo/', endpoint='nested'), + r.Rule('/foobar/', endpoint='nestedbar'), + r.Rule('/foo/<path:testing>/', endpoint='nested_show'), + r.Rule('/foo/<path:testing>/edit', endpoint='nested_edit'), + r.Rule('/users/', endpoint='users', defaults={'page': 1}), + r.Rule('/users/page/<int:page>', endpoint='users'), + r.Rule('/foox', endpoint='foox'), + r.Rule('/<path:bar>/<path:blub>', endpoint='barx_path_path') + ]) + a = m.bind('example.com') + + assert a.match('/') == ('index', {}) + assert a.match('/42') == ('an_int', {'blub': 42}) + assert a.match('/blub') == ('a_string', {'blub': 'blub'}) + assert a.match('/foo/') == ('nested', {}) + assert a.match('/foobar/') == ('nestedbar', {}) + assert a.match('/foo/1/2/3/') == ('nested_show', {'testing': '1/2/3'}) + assert a.match('/foo/1/2/3/edit') == ('nested_edit', {'testing': '1/2/3'}) + assert a.match('/users/') == ('users', {'page': 1}) + assert a.match('/users/page/2') == ('users', {'page': 2}) + assert a.match('/foox') == ('foox', {}) + assert a.match('/1/2/3') == ('barx_path_path', {'bar': '1', 'blub': '2/3'}) + + assert a.build('index') == '/' + assert a.build('an_int', {'blub': 42}) == '/42' + assert a.build('a_string', {'blub': 'test'}) == '/test' + assert a.build('nested') == '/foo/' + assert a.build('nestedbar') == '/foobar/' + assert a.build('nested_show', {'testing': '1/2/3'}) == '/foo/1/2/3/' + assert a.build('nested_edit', {'testing': '1/2/3'}) == '/foo/1/2/3/edit' + assert a.build('users', {'page': 1}) == '/users/' + assert a.build('users', {'page': 2}) == '/users/page/2' + assert a.build('foox') == '/foox' + assert a.build('barx_path_path', {'bar': '1', 'blub': '2/3'}) == '/1/2/3' + + def test_default_converters(self): + class MyMap(r.Map): + default_converters = r.Map.default_converters.copy() + default_converters['foo'] = r.UnicodeConverter + assert isinstance(r.Map.default_converters, ImmutableDict) + m = MyMap([ + r.Rule('/a/<foo:a>', endpoint='a'), + r.Rule('/b/<foo:b>', endpoint='b'), + r.Rule('/c/<c>', endpoint='c') + ], converters={'bar': r.UnicodeConverter}) + a = m.bind('example.org', '/') + assert a.match('/a/1') == ('a', {'a': '1'}) + assert a.match('/b/2') == ('b', {'b': '2'}) + assert a.match('/c/3') == ('c', {'c': '3'}) + assert 'foo' not in r.Map.default_converters + + def test_build_append_unknown(self): + map = r.Map([ + r.Rule('/bar/<float:bazf>', endpoint='barf') + ]) + adapter = map.bind('example.org', '/', subdomain='blah') + assert adapter.build('barf', {'bazf': 0.815, 'bif' : 1.0}) == \ + 'http://example.org/bar/0.815?bif=1.0' + assert adapter.build('barf', {'bazf': 0.815, 'bif' : 1.0}, + append_unknown=False) == 'http://example.org/bar/0.815' + + def test_method_fallback(self): + map = r.Map([ + r.Rule('/', endpoint='index', methods=['GET']), + r.Rule('/<name>', endpoint='hello_name', methods=['GET']), + r.Rule('/select', endpoint='hello_select', methods=['POST']), + r.Rule('/search_get', endpoint='search', methods=['GET']), + r.Rule('/search_post', endpoint='search', methods=['POST']) + ]) + adapter = map.bind('example.com') + assert adapter.build('index') == '/' + assert adapter.build('index', method='GET') == '/' + assert adapter.build('hello_name', {'name': 'foo'}) == '/foo' + assert adapter.build('hello_select') == '/select' + assert adapter.build('hello_select', method='POST') == '/select' + assert adapter.build('search') == '/search_get' + assert adapter.build('search', method='GET') == '/search_get' + assert adapter.build('search', method='POST') == '/search_post' + + def test_implicit_head(self): + url_map = r.Map([ + r.Rule('/get', methods=['GET'], endpoint='a'), + r.Rule('/post', methods=['POST'], endpoint='b') + ]) + adapter = url_map.bind('example.org') + assert adapter.match('/get', method='HEAD') == ('a', {}) + self.assert_raises(r.MethodNotAllowed, adapter.match, + '/post', method='HEAD') + + def test_protocol_joining_bug(self): + m = r.Map([r.Rule('/<foo>', endpoint='x')]) + a = m.bind('example.org') + assert a.build('x', {'foo': 'x:y'}) == '/x:y' + assert a.build('x', {'foo': 'x:y'}, force_external=True) == \ + 'http://example.org/x:y' + + def test_allowed_methods_querying(self): + m = r.Map([r.Rule('/<foo>', methods=['GET', 'HEAD']), + r.Rule('/foo', methods=['POST'])]) + a = m.bind('example.org') + assert sorted(a.allowed_methods('/foo')) == ['GET', 'HEAD', 'POST'] + + def test_external_building_with_port(self): + map = r.Map([ + r.Rule('/', endpoint='index'), + ]) + adapter = map.bind('example.org:5000', '/') + built_url = adapter.build('index', {}, force_external=True) + assert built_url == 'http://example.org:5000/', built_url + + def test_external_building_with_port_bind_to_environ(self): + map = r.Map([ + r.Rule('/', endpoint='index'), + ]) + adapter = map.bind_to_environ( + create_environ('/', 'http://example.org:5000/'), + server_name="example.org:5000" + ) + built_url = adapter.build('index', {}, force_external=True) + assert built_url == 'http://example.org:5000/', built_url + + def test_external_building_with_port_bind_to_environ_wrong_servername(self): + map = r.Map([ + r.Rule('/', endpoint='index'), + ]) + environ = create_environ('/', 'http://example.org:5000/') + adapter = map.bind_to_environ(environ, server_name="example.org") + assert adapter.subdomain == '<invalid>' + + def test_converter_parser(self): + args, kwargs = r.parse_converter_args(u'test, a=1, b=3.0') + + assert args == ('test',) + assert kwargs == {'a': 1, 'b': 3.0 } + + args, kwargs = r.parse_converter_args('') + assert not args and not kwargs + + args, kwargs = r.parse_converter_args('a, b, c,') + assert args == ('a', 'b', 'c') + assert not kwargs + + args, kwargs = r.parse_converter_args('True, False, None') + assert args == (True, False, None) + + args, kwargs = r.parse_converter_args('"foo", u"bar"') + assert args == ('foo', 'bar') + + def test_alias_redirects(self): + m = r.Map([ + r.Rule('/', endpoint='index'), + r.Rule('/index.html', endpoint='index', alias=True), + r.Rule('/users/', defaults={'page': 1}, endpoint='users'), + r.Rule('/users/index.html', defaults={'page': 1}, alias=True, + endpoint='users'), + r.Rule('/users/page/<int:page>', endpoint='users'), + r.Rule('/users/page-<int:page>.html', alias=True, endpoint='users'), + ]) + a = m.bind('example.com') + + def ensure_redirect(path, new_url, args=None): + try: + a.match(path, query_args=args) + except r.RequestRedirect, e: + assert e.new_url == 'http://example.com' + new_url + else: + assert False, 'expected redirect' + + ensure_redirect('/index.html', '/') + ensure_redirect('/users/index.html', '/users/') + ensure_redirect('/users/page-2.html', '/users/page/2') + ensure_redirect('/users/page-1.html', '/users/') + ensure_redirect('/users/page-1.html', '/users/?foo=bar', {'foo': 'bar'}) + + assert a.build('index') == '/' + assert a.build('users', {'page': 1}) == '/users/' + assert a.build('users', {'page': 2}) == '/users/page/2' + + def test_double_defaults(self): + for prefix in '', '/aaa': + m = r.Map([ + r.Rule(prefix + '/', defaults={'foo': 1, 'bar': False}, endpoint='x'), + r.Rule(prefix + '/<int:foo>', defaults={'bar': False}, endpoint='x'), + r.Rule(prefix + '/bar/', defaults={'foo': 1, 'bar': True}, endpoint='x'), + r.Rule(prefix + '/bar/<int:foo>', defaults={'bar': True}, endpoint='x') + ]) + a = m.bind('example.com') + + assert a.match(prefix + '/') == ('x', {'foo': 1, 'bar': False}) + assert a.match(prefix + '/2') == ('x', {'foo': 2, 'bar': False}) + assert a.match(prefix + '/bar/') == ('x', {'foo': 1, 'bar': True}) + assert a.match(prefix + '/bar/2') == ('x', {'foo': 2, 'bar': True}) + + assert a.build('x', {'foo': 1, 'bar': False}) == prefix + '/' + assert a.build('x', {'foo': 2, 'bar': False}) == prefix + '/2' + assert a.build('x', {'bar': False}) == prefix + '/' + assert a.build('x', {'foo': 1, 'bar': True}) == prefix + '/bar/' + assert a.build('x', {'foo': 2, 'bar': True}) == prefix + '/bar/2' + assert a.build('x', {'bar': True}) == prefix + '/bar/' + + def test_host_matching(self): + m = r.Map([ + r.Rule('/', endpoint='index', host='www.<domain>'), + r.Rule('/', endpoint='files', host='files.<domain>'), + r.Rule('/foo/', defaults={'page': 1}, host='www.<domain>', endpoint='x'), + r.Rule('/<int:page>', host='files.<domain>', endpoint='x') + ], host_matching=True) + + a = m.bind('www.example.com') + assert a.match('/') == ('index', {'domain': 'example.com'}) + assert a.match('/foo/') == ('x', {'domain': 'example.com', 'page': 1}) + try: + a.match('/foo') + except r.RequestRedirect, e: + assert e.new_url == 'http://www.example.com/foo/' + else: + assert False, 'expected redirect' + + a = m.bind('files.example.com') + assert a.match('/') == ('files', {'domain': 'example.com'}) + assert a.match('/2') == ('x', {'domain': 'example.com', 'page': 2}) + try: + a.match('/1') + except r.RequestRedirect, e: + assert e.new_url == 'http://www.example.com/foo/' + else: + assert False, 'expected redirect' + + def test_server_name_casing(self): + m = r.Map([ + r.Rule('/', endpoint='index', subdomain='foo') + ]) + + env = create_environ() + env['SERVER_NAME'] = env['HTTP_HOST'] = 'FOO.EXAMPLE.COM' + a = m.bind_to_environ(env, server_name='example.com') + assert a.match('/') == ('index', {}) + + env = create_environ() + env['SERVER_NAME'] = '127.0.0.1' + env['SERVER_PORT'] = '5000' + del env['HTTP_HOST'] + a = m.bind_to_environ(env, server_name='example.com') + try: + a.match() + except r.NotFound: + pass + else: + assert False, 'Expected not found exception' + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(RoutingTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/security.py b/websdk/werkzeug/testsuite/security.py new file mode 100644 index 0000000..bdd0b34 --- /dev/null +++ b/websdk/werkzeug/testsuite/security.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.security + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the security helpers. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import os +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.security import check_password_hash, generate_password_hash, \ + safe_join + + +class SecurityTestCase(WerkzeugTestCase): + + def test_password_hashing(self): + """Test the password hashing and password hash checking""" + hash1 = generate_password_hash('default') + hash2 = generate_password_hash(u'default', method='sha1') + assert hash1 != hash2 + assert check_password_hash(hash1, 'default') + assert check_password_hash(hash2, 'default') + assert hash1.startswith('sha1$') + assert hash2.startswith('sha1$') + + fakehash = generate_password_hash('default', method='plain') + assert fakehash == 'plain$$default' + assert check_password_hash(fakehash, 'default') + + mhash = generate_password_hash(u'default', method='md5') + assert mhash.startswith('md5$') + assert check_password_hash(mhash, 'default') + + legacy = 'md5$$c21f969b5f03d33d43e04f8f136e7682' + assert check_password_hash(legacy, 'default') + + legacy = u'md5$$c21f969b5f03d33d43e04f8f136e7682' + assert check_password_hash(legacy, 'default') + + def test_safe_join(self): + """Test the safe joining helper""" + assert safe_join('foo', 'bar/baz') == os.path.join('foo', 'bar/baz') + assert safe_join('foo', '../bar/baz') is None + if os.name == 'nt': + assert safe_join('foo', 'foo\\bar') is None + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SecurityTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/serving.py b/websdk/werkzeug/testsuite/serving.py new file mode 100644 index 0000000..e26271d --- /dev/null +++ b/websdk/werkzeug/testsuite/serving.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.serving + ~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Added serving tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import sys +import time +import urllib +import unittest +from functools import update_wrapper +from StringIO import StringIO + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import __version__ as version, serving +from werkzeug.testapp import test_app +from threading import Thread + + +real_make_server = serving.make_server + + +def silencestderr(f): + def new_func(*args, **kwargs): + old_stderr = sys.stderr + sys.stderr = StringIO() + try: + return f(*args, **kwargs) + finally: + sys.stderr = old_stderr + return update_wrapper(new_func, f) + + +def run_dev_server(application): + servers = [] + def tracking_make_server(*args, **kwargs): + srv = real_make_server(*args, **kwargs) + servers.append(srv) + return srv + serving.make_server = tracking_make_server + try: + t = Thread(target=serving.run_simple, args=('localhost', 0, application)) + t.setDaemon(True) + t.start() + time.sleep(0.25) + finally: + serving.make_server = real_make_server + if not servers: + return None, None + server ,= servers + ip, port = server.socket.getsockname()[:2] + if ':' in ip: + ip = '[%s]' % ip + return server, '%s:%d' % (ip, port) + + +class ServingTestCase(WerkzeugTestCase): + + @silencestderr + def test_serving(self): + server, addr = run_dev_server(test_app) + rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read() + assert 'WSGI Information' in rv + assert 'foo=bar&baz=blah' in rv + assert ('Werkzeug/%s' % version) in rv + + @silencestderr + def test_broken_app(self): + def broken_app(environ, start_response): + 1/0 + server, addr = run_dev_server(broken_app) + rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read() + assert 'Internal Server Error' in rv + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ServingTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/test.py b/websdk/werkzeug/testsuite/test.py new file mode 100644 index 0000000..795de90 --- /dev/null +++ b/websdk/werkzeug/testsuite/test.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.test + ~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the testing tools. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import sys +import unittest +from cStringIO import StringIO, OutputType + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.wrappers import Request, Response, BaseResponse +from werkzeug.test import Client, EnvironBuilder, create_environ, \ + ClientRedirectError, stream_encode_multipart, run_wsgi_app +from werkzeug.utils import redirect +from werkzeug.formparser import parse_form_data +from werkzeug.datastructures import MultiDict + + +def cookie_app(environ, start_response): + """A WSGI application which sets a cookie, and returns as a ersponse any + cookie which exists. + """ + response = Response(environ.get('HTTP_COOKIE', 'No Cookie'), + mimetype='text/plain') + response.set_cookie('test', 'test') + return response(environ, start_response) + + +def redirect_loop_app(environ, start_response): + response = redirect('http://localhost/some/redirect/') + return response(environ, start_response) + + +def redirect_with_get_app(environ, start_response): + req = Request(environ) + if req.url not in ('http://localhost/', + 'http://localhost/first/request', + 'http://localhost/some/redirect/'): + assert False, 'redirect_demo_app() did not expect URL "%s"' % req.url + if '/some/redirect' not in req.url: + response = redirect('http://localhost/some/redirect/') + else: + response = Response('current url: %s' % req.url) + return response(environ, start_response) + + +def redirect_with_post_app(environ, start_response): + req = Request(environ) + if req.url == 'http://localhost/some/redirect/': + assert req.method == 'GET', 'request should be GET' + assert not req.form, 'request should not have data' + response = Response('current url: %s' % req.url) + else: + response = redirect('http://localhost/some/redirect/') + return response(environ, start_response) + + +def external_redirect_demo_app(environ, start_response): + response = redirect('http://example.com/') + return response(environ, start_response) + + +def external_subdomain_redirect_demo_app(environ, start_response): + if 'test.example.com' in environ['HTTP_HOST']: + response = Response('redirected successfully to subdomain') + else: + response = redirect('http://test.example.com/login') + return response(environ, start_response) + + +def multi_value_post_app(environ, start_response): + req = Request(environ) + assert req.form['field'] == 'val1', req.form['field'] + assert req.form.getlist('field') == ['val1', 'val2'], req.form.getlist('field') + response = Response('ok') + return response(environ, start_response) + + +class TestTestCase(WerkzeugTestCase): + + def test_cookie_forging(self): + c = Client(cookie_app) + c.set_cookie('localhost', 'foo', 'bar') + appiter, code, headers = c.open() + assert list(appiter) == ['foo=bar'] + + def test_set_cookie_app(self): + c = Client(cookie_app) + appiter, code, headers = c.open() + assert 'Set-Cookie' in dict(headers) + + def test_cookiejar_stores_cookie(self): + c = Client(cookie_app) + appiter, code, headers = c.open() + assert 'test' in c.cookie_jar._cookies['localhost.local']['/'] + + def test_no_initial_cookie(self): + c = Client(cookie_app) + appiter, code, headers = c.open() + assert ''.join(appiter) == 'No Cookie' + + def test_resent_cookie(self): + c = Client(cookie_app) + c.open() + appiter, code, headers = c.open() + assert ''.join(appiter) == 'test=test' + + def test_disable_cookies(self): + c = Client(cookie_app, use_cookies=False) + c.open() + appiter, code, headers = c.open() + assert ''.join(appiter) == 'No Cookie' + + def test_cookie_for_different_path(self): + c = Client(cookie_app) + c.open('/path1') + appiter, code, headers = c.open('/path2') + assert ''.join(appiter) == 'test=test' + + def test_environ_builder_basics(self): + b = EnvironBuilder() + assert b.content_type is None + b.method = 'POST' + assert b.content_type == 'application/x-www-form-urlencoded' + b.files.add_file('test', StringIO('test contents'), 'test.txt') + assert b.files['test'].content_type == 'text/plain' + assert b.content_type == 'multipart/form-data' + b.form['test'] = 'normal value' + + req = b.get_request() + b.close() + + assert req.url == 'http://localhost/' + assert req.method == 'POST' + assert req.form['test'] == 'normal value' + assert req.files['test'].content_type == 'text/plain' + assert req.files['test'].filename == 'test.txt' + assert req.files['test'].read() == 'test contents' + + def test_environ_builder_headers(self): + b = EnvironBuilder(environ_base={'HTTP_USER_AGENT': 'Foo/0.1'}, + environ_overrides={'wsgi.version': (1, 1)}) + b.headers['X-Suck-My-Dick'] = 'very well sir' + env = b.get_environ() + assert env['HTTP_USER_AGENT'] == 'Foo/0.1' + assert env['HTTP_X_SUCK_MY_DICK'] == 'very well sir' + assert env['wsgi.version'] == (1, 1) + + b.headers['User-Agent'] = 'Bar/1.0' + env = b.get_environ() + assert env['HTTP_USER_AGENT'] == 'Bar/1.0' + + def test_environ_builder_paths(self): + b = EnvironBuilder(path='/foo', base_url='http://example.com/') + assert b.base_url == 'http://example.com/' + assert b.path == '/foo' + assert b.script_root == '' + assert b.host == 'example.com' + + b = EnvironBuilder(path='/foo', base_url='http://example.com/bar') + assert b.base_url == 'http://example.com/bar/' + assert b.path == '/foo' + assert b.script_root == '/bar' + assert b.host == 'example.com' + + b.host = 'localhost' + assert b.base_url == 'http://localhost/bar/' + b.base_url = 'http://localhost:8080/' + assert b.host == 'localhost:8080' + assert b.server_name == 'localhost' + assert b.server_port == 8080 + + b.host = 'foo.invalid' + b.url_scheme = 'https' + b.script_root = '/test' + env = b.get_environ() + assert env['SERVER_NAME'] == 'foo.invalid' + assert env['SERVER_PORT'] == '443' + assert env['SCRIPT_NAME'] == '/test' + assert env['PATH_INFO'] == '/foo' + assert env['HTTP_HOST'] == 'foo.invalid' + assert env['wsgi.url_scheme'] == 'https' + assert b.base_url == 'https://foo.invalid/test/' + + def test_environ_builder_content_type(self): + builder = EnvironBuilder() + assert builder.content_type is None + builder.method = 'POST' + assert builder.content_type == 'application/x-www-form-urlencoded' + builder.form['foo'] = 'bar' + assert builder.content_type == 'application/x-www-form-urlencoded' + builder.files.add_file('blafasel', StringIO('foo'), 'test.txt') + assert builder.content_type == 'multipart/form-data' + req = builder.get_request() + assert req.form['foo'] == 'bar' + assert req.files['blafasel'].read() == 'foo' + + def test_environ_builder_stream_switch(self): + d = MultiDict(dict(foo=u'bar', blub=u'blah', hu=u'hum')) + for use_tempfile in False, True: + stream, length, boundary = stream_encode_multipart( + d, use_tempfile, threshold=150) + assert isinstance(stream, OutputType) != use_tempfile + + form = parse_form_data({'wsgi.input': stream, 'CONTENT_LENGTH': str(length), + 'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' % + boundary})[1] + assert form == d + + def test_create_environ(self): + env = create_environ('/foo?bar=baz', 'http://example.org/') + expected = { + 'wsgi.multiprocess': False, + 'wsgi.version': (1, 0), + 'wsgi.run_once': False, + 'wsgi.errors': sys.stderr, + 'wsgi.multithread': False, + 'wsgi.url_scheme': 'http', + 'SCRIPT_NAME': '', + 'CONTENT_TYPE': '', + 'CONTENT_LENGTH': '0', + 'SERVER_NAME': 'example.org', + 'REQUEST_METHOD': 'GET', + 'HTTP_HOST': 'example.org', + 'PATH_INFO': '/foo', + 'SERVER_PORT': '80', + 'SERVER_PROTOCOL': 'HTTP/1.1', + 'QUERY_STRING': 'bar=baz' + } + for key, value in expected.iteritems(): + assert env[key] == value + assert env['wsgi.input'].read(0) == '' + + assert create_environ('/foo', 'http://example.com/')['SCRIPT_NAME'] == '' + + def test_file_closing(self): + closed = [] + class SpecialInput(object): + def read(self): + return '' + def close(self): + closed.append(self) + + env = create_environ(data={'foo': SpecialInput()}) + assert len(closed) == 1 + builder = EnvironBuilder() + builder.files.add_file('blah', SpecialInput()) + builder.close() + assert len(closed) == 2 + + def test_follow_redirect(self): + env = create_environ('/', base_url='http://localhost') + c = Client(redirect_with_get_app) + appiter, code, headers = c.open(environ_overrides=env, follow_redirects=True) + assert code == '200 OK' + assert ''.join(appiter) == 'current url: http://localhost/some/redirect/' + + # Test that the :cls:`Client` is aware of user defined response wrappers + c = Client(redirect_with_get_app, response_wrapper=BaseResponse) + resp = c.get('/', follow_redirects=True) + assert resp.status_code == 200 + assert resp.data == 'current url: http://localhost/some/redirect/' + + # test with URL other than '/' to make sure redirected URL's are correct + c = Client(redirect_with_get_app, response_wrapper=BaseResponse) + resp = c.get('/first/request', follow_redirects=True) + assert resp.status_code == 200 + assert resp.data == 'current url: http://localhost/some/redirect/' + + def test_follow_external_redirect(self): + env = create_environ('/', base_url='http://localhost') + c = Client(external_redirect_demo_app) + self.assert_raises(RuntimeError, lambda: + c.get(environ_overrides=env, follow_redirects=True)) + + def test_follow_external_redirect_on_same_subdomain(self): + env = create_environ('/', base_url='http://example.com') + c = Client(external_subdomain_redirect_demo_app, allow_subdomain_redirects=True) + c.get(environ_overrides=env, follow_redirects=True) + + # check that this does not work for real external domains + env = create_environ('/', base_url='http://localhost') + self.assert_raises(RuntimeError, lambda: + c.get(environ_overrides=env, follow_redirects=True)) + + # check that subdomain redirects fail if no `allow_subdomain_redirects` is applied + c = Client(external_subdomain_redirect_demo_app) + self.assert_raises(RuntimeError, lambda: + c.get(environ_overrides=env, follow_redirects=True)) + + def test_follow_redirect_loop(self): + c = Client(redirect_loop_app, response_wrapper=BaseResponse) + with self.assert_raises(ClientRedirectError): + resp = c.get('/', follow_redirects=True) + + def test_follow_redirect_with_post(self): + c = Client(redirect_with_post_app, response_wrapper=BaseResponse) + resp = c.post('/', follow_redirects=True, data='foo=blub+hehe&blah=42') + assert resp.status_code == 200 + assert resp.data == 'current url: http://localhost/some/redirect/' + + def test_path_info_script_name_unquoting(self): + def test_app(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/plain')]) + return [environ['PATH_INFO'] + '\n' + environ['SCRIPT_NAME']] + c = Client(test_app, response_wrapper=BaseResponse) + resp = c.get('/foo%40bar') + assert resp.data == '/foo@bar\n' + c = Client(test_app, response_wrapper=BaseResponse) + resp = c.get('/foo%40bar', 'http://localhost/bar%40baz') + assert resp.data == '/foo@bar\n/bar@baz' + + def test_multi_value_submit(self): + c = Client(multi_value_post_app, response_wrapper=BaseResponse) + data = { + 'field': ['val1','val2'] + } + resp = c.post('/', data=data) + assert resp.status_code == 200 + c = Client(multi_value_post_app, response_wrapper=BaseResponse) + data = MultiDict({ + 'field': ['val1','val2'] + }) + resp = c.post('/', data=data) + assert resp.status_code == 200 + + def test_iri_support(self): + b = EnvironBuilder(u'/föö-bar', base_url=u'http://☃.net/') + assert b.path == '/f%C3%B6%C3%B6-bar' + assert b.base_url == 'http://xn--n3h.net/' + + def test_run_wsgi_apps(self): + def simple_app(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/html')]) + return ['Hello World!'] + app_iter, status, headers = run_wsgi_app(simple_app, {}) + assert status == '200 OK' + assert headers == [('Content-Type', 'text/html')] + assert app_iter == ['Hello World!'] + + def yielding_app(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/html')]) + yield 'Hello ' + yield 'World!' + app_iter, status, headers = run_wsgi_app(yielding_app, {}) + assert status == '200 OK' + assert headers == [('Content-Type', 'text/html')] + assert list(app_iter) == ['Hello ', 'World!'] + + def test_multiple_cookies(self): + @Request.application + def test_app(request): + response = Response(repr(sorted(request.cookies.items()))) + response.set_cookie('test1', 'foo') + response.set_cookie('test2', 'bar') + return response + client = Client(test_app, Response) + resp = client.get('/') + assert resp.data == '[]' + resp = client.get('/') + assert resp.data == "[('test1', u'foo'), ('test2', u'bar')]" + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/urls.py b/websdk/werkzeug/testsuite/urls.py new file mode 100644 index 0000000..8c908b8 --- /dev/null +++ b/websdk/werkzeug/testsuite/urls.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.urls + ~~~~~~~~~~~~~~~~~~~~~~~ + + URL helper tests. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +import unittest +from StringIO import StringIO + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.datastructures import OrderedMultiDict +from werkzeug import urls + + +class URLsTestCase(WerkzeugTestCase): + + def test_quoting(self): + assert urls.url_quote(u'\xf6\xe4\xfc') == '%C3%B6%C3%A4%C3%BC' + assert urls.url_unquote(urls.url_quote(u'#%="\xf6')) == u'#%="\xf6' + assert urls.url_quote_plus('foo bar') == 'foo+bar' + assert urls.url_unquote_plus('foo+bar') == 'foo bar' + assert urls.url_encode({'a': None, 'b': 'foo bar'}) == 'b=foo+bar' + assert urls.url_fix(u'http://de.wikipedia.org/wiki/Elf (Begriffsklärung)') == \ + 'http://de.wikipedia.org/wiki/Elf%20%28Begriffskl%C3%A4rung%29' + + def test_url_decoding(self): + x = urls.url_decode('foo=42&bar=23&uni=H%C3%A4nsel') + assert x['foo'] == '42' + assert x['bar'] == '23' + assert x['uni'] == u'Hänsel' + + x = urls.url_decode('foo=42;bar=23;uni=H%C3%A4nsel', separator=';') + assert x['foo'] == '42' + assert x['bar'] == '23' + assert x['uni'] == u'Hänsel' + + x = urls.url_decode('%C3%9Ch=H%C3%A4nsel', decode_keys=True) + assert x[u'Üh'] == u'Hänsel' + + def test_streamed_url_decoding(self): + item1 = 'a' * 100000 + item2 = 'b' * 400 + string = 'a=%s&b=%s&c=%s' % (item1, item2, item2) + gen = urls.url_decode_stream(StringIO(string), limit=len(string), + return_iterator=True) + self.assert_equal(gen.next(), ('a', item1)) + self.assert_equal(gen.next(), ('b', item2)) + self.assert_equal(gen.next(), ('c', item2)) + self.assert_raises(StopIteration, gen.next) + + def test_url_encoding(self): + assert urls.url_encode({'foo': 'bar 45'}) == 'foo=bar+45' + d = {'foo': 1, 'bar': 23, 'blah': u'Hänsel'} + assert urls.url_encode(d, sort=True) == 'bar=23&blah=H%C3%A4nsel&foo=1' + assert urls.url_encode(d, sort=True, separator=';') == 'bar=23;blah=H%C3%A4nsel;foo=1' + + def test_sorted_url_encode(self): + assert urls.url_encode({"a": 42, "b": 23, 1: 1, 2: 2}, sort=True) == '1=1&2=2&a=42&b=23' + assert urls.url_encode({'A': 1, 'a': 2, 'B': 3, 'b': 4}, sort=True, + key=lambda x: x[0].lower()) == 'A=1&a=2&B=3&b=4' + + def test_streamed_url_encoding(self): + out = StringIO() + urls.url_encode_stream({'foo': 'bar 45'}, out) + self.assert_equal(out.getvalue(), 'foo=bar+45') + + d = {'foo': 1, 'bar': 23, 'blah': u'Hänsel'} + out = StringIO() + urls.url_encode_stream(d, out, sort=True) + self.assert_equal(out.getvalue(), 'bar=23&blah=H%C3%A4nsel&foo=1') + out = StringIO() + urls.url_encode_stream(d, out, sort=True, separator=';') + self.assert_equal(out.getvalue(), 'bar=23;blah=H%C3%A4nsel;foo=1') + + gen = urls.url_encode_stream(d, sort=True) + self.assert_equal(gen.next(), 'bar=23') + self.assert_equal(gen.next(), 'blah=H%C3%A4nsel') + self.assert_equal(gen.next(), 'foo=1') + self.assert_raises(StopIteration, gen.next) + + def test_url_fixing(self): + x = urls.url_fix(u'http://de.wikipedia.org/wiki/Elf (Begriffskl\xe4rung)') + assert x == 'http://de.wikipedia.org/wiki/Elf%20%28Begriffskl%C3%A4rung%29' + + x = urls.url_fix('http://example.com/?foo=%2f%2f') + assert x == 'http://example.com/?foo=%2f%2f' + + def test_iri_support(self): + self.assert_raises(UnicodeError, urls.uri_to_iri, u'http://föö.com/') + self.assert_raises(UnicodeError, urls.iri_to_uri, 'http://föö.com/') + assert urls.uri_to_iri('http://xn--n3h.net/') == u'http://\u2603.net/' + assert urls.uri_to_iri('http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th') == \ + u'http://\xfcser:p\xe4ssword@\u2603.net/p\xe5th' + assert urls.iri_to_uri(u'http://☃.net/') == 'http://xn--n3h.net/' + assert urls.iri_to_uri(u'http://üser:pässword@☃.net/påth') == \ + 'http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th' + + assert urls.uri_to_iri('http://test.com/%3Fmeh?foo=%26%2F') == \ + u'http://test.com/%3Fmeh?foo=%26%2F' + + # this should work as well, might break on 2.4 because of a broken + # idna codec + assert urls.uri_to_iri('/foo') == u'/foo' + assert urls.iri_to_uri(u'/foo') == '/foo' + + def test_ordered_multidict_encoding(self): + d = OrderedMultiDict() + d.add('foo', 1) + d.add('foo', 2) + d.add('foo', 3) + d.add('bar', 0) + d.add('foo', 4) + assert urls.url_encode(d) == 'foo=1&foo=2&foo=3&bar=0&foo=4' + + def test_href(self): + x = urls.Href('http://www.example.com/') + assert x('foo') == 'http://www.example.com/foo' + assert x.foo('bar') == 'http://www.example.com/foo/bar' + assert x.foo('bar', x=42) == 'http://www.example.com/foo/bar?x=42' + assert x.foo('bar', class_=42) == 'http://www.example.com/foo/bar?class=42' + assert x.foo('bar', {'class': 42}) == 'http://www.example.com/foo/bar?class=42' + self.assert_raises(AttributeError, lambda: x.__blah__) + + x = urls.Href('blah') + assert x.foo('bar') == 'blah/foo/bar' + + self.assert_raises(TypeError, x.foo, {"foo": 23}, x=42) + + x = urls.Href('') + assert x('foo') == 'foo' + + def test_href_url_join(self): + x = urls.Href('test') + assert x('foo:bar') == 'test/foo:bar' + assert x('http://example.com/') == 'test/http://example.com/' + + if 0: + # stdlib bug? :( + def test_href_past_root(self): + base_href = urls.Href('http://www.blagga.com/1/2/3') + assert base_href('../foo') == 'http://www.blagga.com/1/2/foo' + assert base_href('../../foo') == 'http://www.blagga.com/1/foo' + assert base_href('../../../foo') == 'http://www.blagga.com/foo' + assert base_href('../../../../foo') == 'http://www.blagga.com/foo' + assert base_href('../../../../../foo') == 'http://www.blagga.com/foo' + assert base_href('../../../../../../foo') == 'http://www.blagga.com/foo' + + def test_url_unquote_plus_unicode(self): + # was broken in 0.6 + assert urls.url_unquote_plus(u'\x6d') == u'\x6d' + assert type(urls.url_unquote_plus(u'\x6d')) is unicode + + def test_quoting_of_local_urls(self): + rv = urls.iri_to_uri(u'/foo\x8f') + assert rv == '/foo%C2%8F' + assert type(rv) is str + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(URLsTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/utils.py b/websdk/werkzeug/testsuite/utils.py new file mode 100644 index 0000000..5ef953c --- /dev/null +++ b/websdk/werkzeug/testsuite/utils.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.utils + ~~~~~~~~~~~~~~~~~~~~~~~~ + + General utilities. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest +from datetime import datetime + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import utils +from werkzeug.datastructures import Headers +from werkzeug.http import parse_date, http_date +from werkzeug.wrappers import BaseResponse +from werkzeug.test import Client, run_wsgi_app + + +class GeneralUtilityTestCase(WerkzeugTestCase): + + def test_redirect(self): + resp = utils.redirect(u'/füübär') + assert '/f%C3%BC%C3%BCb%C3%A4r' in resp.data + assert resp.headers['Location'] == '/f%C3%BC%C3%BCb%C3%A4r' + assert resp.status_code == 302 + + resp = utils.redirect(u'http://☃.net/', 307) + assert 'http://xn--n3h.net/' in resp.data + assert resp.headers['Location'] == 'http://xn--n3h.net/' + assert resp.status_code == 307 + + resp = utils.redirect('http://example.com/', 305) + assert resp.headers['Location'] == 'http://example.com/' + assert resp.status_code == 305 + + def test_cached_property(self): + foo = [] + class A(object): + def prop(self): + foo.append(42) + return 42 + prop = utils.cached_property(prop) + + a = A() + p = a.prop + q = a.prop + assert p == q == 42 + assert foo == [42] + + foo = [] + class A(object): + def _prop(self): + foo.append(42) + return 42 + prop = utils.cached_property(_prop, name='prop') + del _prop + + a = A() + p = a.prop + q = a.prop + assert p == q == 42 + assert foo == [42] + + def test_environ_property(self): + class A(object): + environ = {'string': 'abc', 'number': '42'} + + string = utils.environ_property('string') + missing = utils.environ_property('missing', 'spam') + read_only = utils.environ_property('number') + number = utils.environ_property('number', load_func=int) + broken_number = utils.environ_property('broken_number', load_func=int) + date = utils.environ_property('date', None, parse_date, http_date, + read_only=False) + foo = utils.environ_property('foo') + + a = A() + assert a.string == 'abc' + assert a.missing == 'spam' + def test_assign(): + a.read_only = 'something' + self.assert_raises(AttributeError, test_assign) + assert a.number == 42 + assert a.broken_number == None + assert a.date is None + a.date = datetime(2008, 1, 22, 10, 0, 0, 0) + assert a.environ['date'] == 'Tue, 22 Jan 2008 10:00:00 GMT' + + def test_escape(self): + class Foo(str): + def __html__(self): + return unicode(self) + assert utils.escape(None) == '' + assert utils.escape(42) == '42' + assert utils.escape('<>') == '<>' + assert utils.escape('"foo"') == '"foo"' + assert utils.escape('"foo"', True) == '"foo"' + assert utils.escape(Foo('<foo>')) == '<foo>' + + def test_unescape(self): + assert utils.unescape('<ä>') == u'<ä>' + + def test_run_wsgi_app(self): + def foo(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/plain')]) + yield '1' + yield '2' + yield '3' + + app_iter, status, headers = run_wsgi_app(foo, {}) + assert status == '200 OK' + assert headers == [('Content-Type', 'text/plain')] + assert app_iter.next() == '1' + assert app_iter.next() == '2' + assert app_iter.next() == '3' + self.assert_raises(StopIteration, app_iter.next) + + got_close = [] + class CloseIter(object): + def __init__(self): + self.iterated = False + def __iter__(self): + return self + def close(self): + got_close.append(None) + def next(self): + if self.iterated: + raise StopIteration() + self.iterated = True + return 'bar' + + def bar(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/plain')]) + return CloseIter() + + app_iter, status, headers = run_wsgi_app(bar, {}) + assert status == '200 OK' + assert headers == [('Content-Type', 'text/plain')] + assert app_iter.next() == 'bar' + self.assert_raises(StopIteration, app_iter.next) + app_iter.close() + + assert run_wsgi_app(bar, {}, True)[0] == ['bar'] + + assert len(got_close) == 2 + + def test_import_string(self): + import cgi + from werkzeug.debug import DebuggedApplication + assert utils.import_string('cgi.escape') is cgi.escape + assert utils.import_string(u'cgi.escape') is cgi.escape + assert utils.import_string('cgi:escape') is cgi.escape + assert utils.import_string('XXXXXXXXXXXX', True) is None + assert utils.import_string('cgi.XXXXXXXXXXXX', True) is None + assert utils.import_string(u'cgi.escape') is cgi.escape + assert utils.import_string(u'werkzeug.debug.DebuggedApplication') is DebuggedApplication + self.assert_raises(ImportError, utils.import_string, 'XXXXXXXXXXXXXXXX') + self.assert_raises(ImportError, utils.import_string, 'cgi.XXXXXXXXXX') + + def test_find_modules(self): + assert list(utils.find_modules('werkzeug.debug')) == \ + ['werkzeug.debug.console', 'werkzeug.debug.repr', + 'werkzeug.debug.tbtools'] + + def test_html_builder(self): + html = utils.html + xhtml = utils.xhtml + assert html.p('Hello World') == '<p>Hello World</p>' + assert html.a('Test', href='#') == '<a href="#">Test</a>' + assert html.br() == '<br>' + assert xhtml.br() == '<br />' + assert html.img(src='foo') == '<img src="foo">' + assert xhtml.img(src='foo') == '<img src="foo" />' + assert html.html( + html.head( + html.title('foo'), + html.script(type='text/javascript') + ) + ) == '<html><head><title>foo</title><script type="text/javascript">' \ + '</script></head></html>' + assert html('<foo>') == '<foo>' + assert html.input(disabled=True) == '<input disabled>' + assert xhtml.input(disabled=True) == '<input disabled="disabled" />' + assert html.input(disabled='') == '<input>' + assert xhtml.input(disabled='') == '<input />' + assert html.input(disabled=None) == '<input>' + assert xhtml.input(disabled=None) == '<input />' + assert html.script('alert("Hello World");') == '<script>' \ + 'alert("Hello World");</script>' + assert xhtml.script('alert("Hello World");') == '<script>' \ + '/*<![CDATA[*/alert("Hello World");/*]]>*/</script>' + + def test_validate_arguments(self): + take_none = lambda: None + take_two = lambda a, b: None + take_two_one_default = lambda a, b=0: None + + assert utils.validate_arguments(take_two, (1, 2,), {}) == ((1, 2), {}) + assert utils.validate_arguments(take_two, (1,), {'b': 2}) == ((1, 2), {}) + assert utils.validate_arguments(take_two_one_default, (1,), {}) == ((1, 0), {}) + assert utils.validate_arguments(take_two_one_default, (1, 2), {}) == ((1, 2), {}) + + self.assert_raises(utils.ArgumentValidationError, + utils.validate_arguments, take_two, (), {}) + + assert utils.validate_arguments(take_none, (1, 2,), {'c': 3}) == ((), {}) + self.assert_raises(utils.ArgumentValidationError, + utils.validate_arguments, take_none, (1,), {}, drop_extra=False) + self.assert_raises(utils.ArgumentValidationError, + utils.validate_arguments, take_none, (), {'a': 1}, drop_extra=False) + + def test_header_set_duplication_bug(self): + headers = Headers([ + ('Content-Type', 'text/html'), + ('Foo', 'bar'), + ('Blub', 'blah') + ]) + headers['blub'] = 'hehe' + headers['blafasel'] = 'humm' + assert headers == Headers([ + ('Content-Type', 'text/html'), + ('Foo', 'bar'), + ('blub', 'hehe'), + ('blafasel', 'humm') + ]) + + def test_append_slash_redirect(self): + def app(env, sr): + return utils.append_slash_redirect(env)(env, sr) + client = Client(app, BaseResponse) + response = client.get('foo', base_url='http://example.org/app') + assert response.status_code == 301 + assert response.headers['Location'] == 'http://example.org/app/foo/' + + def test_cached_property_doc(self): + @utils.cached_property + def foo(): + """testing""" + return 42 + assert foo.__doc__ == 'testing' + assert foo.__name__ == 'foo' + assert foo.__module__ == __name__ + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(GeneralUtilityTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/wrappers.py b/websdk/werkzeug/testsuite/wrappers.py new file mode 100644 index 0000000..b18f891 --- /dev/null +++ b/websdk/werkzeug/testsuite/wrappers.py @@ -0,0 +1,662 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.wrappers + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests for the response and request objects. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +import pickle +from StringIO import StringIO +from datetime import datetime + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug import wrappers +from werkzeug.datastructures import MultiDict, ImmutableOrderedMultiDict, \ + ImmutableList, ImmutableTypeConversionDict, CharsetAccept, \ + CombinedMultiDict +from werkzeug.test import Client, create_environ, run_wsgi_app + + +class RequestTestResponse(wrappers.BaseResponse): + """Subclass of the normal response class we use to test response + and base classes. Has some methods to test if things in the + response match. + """ + + def __init__(self, response, status, headers): + wrappers.BaseResponse.__init__(self, response, status, headers) + self.body_data = pickle.loads(self.data) + + def __getitem__(self, key): + return self.body_data[key] + + +def request_demo_app(environ, start_response): + request = wrappers.BaseRequest(environ) + assert 'werkzeug.request' in environ + start_response('200 OK', [('Content-Type', 'text/plain')]) + return [pickle.dumps({ + 'args': request.args, + 'args_as_list': request.args.lists(), + 'form': request.form, + 'form_as_list': request.form.lists(), + 'environ': prepare_environ_pickle(request.environ), + 'data': request.data + })] + + +def prepare_environ_pickle(environ): + result = {} + for key, value in environ.iteritems(): + try: + pickle.dumps((key, value)) + except Exception: + continue + result[key] = value + return result + + +class WrappersTestCase(WerkzeugTestCase): + + def assert_environ(self, environ, method): + assert environ['REQUEST_METHOD'] == method + assert environ['PATH_INFO'] == '/' + assert environ['SCRIPT_NAME'] == '' + assert environ['SERVER_NAME'] == 'localhost' + assert environ['wsgi.version'] == (1, 0) + assert environ['wsgi.url_scheme'] == 'http' + + def test_base_request(self): + client = Client(request_demo_app, RequestTestResponse) + + # get requests + response = client.get('/?foo=bar&foo=hehe') + assert response['args'] == MultiDict([('foo', 'bar'), ('foo', 'hehe')]) + assert response['args_as_list'] == [('foo', ['bar', 'hehe'])] + assert response['form'] == MultiDict() + assert response['form_as_list'] == [] + assert response['data'] == '' + self.assert_environ(response['environ'], 'GET') + + # post requests with form data + response = client.post('/?blub=blah', data='foo=blub+hehe&blah=42', + content_type='application/x-www-form-urlencoded') + assert response['args'] == MultiDict([('blub', 'blah')]) + assert response['args_as_list'] == [('blub', ['blah'])] + assert response['form'] == MultiDict([('foo', 'blub hehe'), ('blah', '42')]) + assert response['data'] == '' + # currently we do not guarantee that the values are ordered correctly + # for post data. + ## assert response['form_as_list'] == [('foo', ['blub hehe']), ('blah', ['42'])] + self.assert_environ(response['environ'], 'POST') + + # patch requests with form data + response = client.patch('/?blub=blah', data='foo=blub+hehe&blah=42', + content_type='application/x-www-form-urlencoded') + assert response['args'] == MultiDict([('blub', 'blah')]) + assert response['args_as_list'] == [('blub', ['blah'])] + assert response['form'] == MultiDict([('foo', 'blub hehe'), ('blah', '42')]) + assert response['data'] == '' + self.assert_environ(response['environ'], 'PATCH') + + # post requests with json data + json = '{"foo": "bar", "blub": "blah"}' + response = client.post('/?a=b', data=json, content_type='application/json') + assert response['data'] == json + assert response['args'] == MultiDict([('a', 'b')]) + assert response['form'] == MultiDict() + + def test_access_route(self): + req = wrappers.Request.from_values(headers={ + 'X-Forwarded-For': '192.168.1.2, 192.168.1.1' + }) + req.environ['REMOTE_ADDR'] = '192.168.1.3' + assert req.access_route == ['192.168.1.2', '192.168.1.1'] + assert req.remote_addr == '192.168.1.3' + + req = wrappers.Request.from_values() + req.environ['REMOTE_ADDR'] = '192.168.1.3' + assert req.access_route == ['192.168.1.3'] + + def test_url_request_descriptors(self): + req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test') + assert req.path == u'/bar' + assert req.script_root == u'/test' + assert req.url == 'http://example.com/test/bar?foo=baz' + assert req.base_url == 'http://example.com/test/bar' + assert req.url_root == 'http://example.com/test/' + assert req.host_url == 'http://example.com/' + assert req.host == 'example.com' + assert req.scheme == 'http' + + req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test') + assert req.scheme == 'https' + + def test_authorization_mixin(self): + request = wrappers.Request.from_values(headers={ + 'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==' + }) + a = request.authorization + assert a.type == 'basic' + assert a.username == 'Aladdin' + assert a.password == 'open sesame' + + def test_base_response(self): + # unicode + response = wrappers.BaseResponse(u'öäü') + assert response.data == 'öäü' + + # writing + response = wrappers.Response('foo') + response.stream.write('bar') + assert response.data == 'foobar' + + # set cookie + response = wrappers.BaseResponse() + response.set_cookie('foo', 'bar', 60, 0, '/blub', 'example.org', False) + assert response.headers.to_list() == [ + ('Content-Type', 'text/plain; charset=utf-8'), + ('Set-Cookie', 'foo=bar; Domain=example.org; expires=Thu, ' + '01-Jan-1970 00:00:00 GMT; Max-Age=60; Path=/blub') + ] + + # delete cookie + response = wrappers.BaseResponse() + response.delete_cookie('foo') + assert response.headers.to_list() == [ + ('Content-Type', 'text/plain; charset=utf-8'), + ('Set-Cookie', 'foo=; expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/') + ] + + # close call forwarding + closed = [] + class Iterable(object): + def next(self): + raise StopIteration() + def __iter__(self): + return self + def close(self): + closed.append(True) + response = wrappers.BaseResponse(Iterable()) + response.call_on_close(lambda: closed.append(True)) + app_iter, status, headers = run_wsgi_app(response, + create_environ(), + buffered=True) + assert status == '200 OK' + assert ''.join(app_iter) == '' + assert len(closed) == 2 + + def test_response_status_codes(self): + response = wrappers.BaseResponse() + response.status_code = 404 + assert response.status == '404 NOT FOUND' + response.status = '200 OK' + assert response.status_code == 200 + response.status = '999 WTF' + assert response.status_code == 999 + response.status_code = 588 + assert response.status_code == 588 + assert response.status == '588 UNKNOWN' + response.status = 'wtf' + assert response.status_code == 0 + + def test_type_forcing(self): + def wsgi_application(environ, start_response): + start_response('200 OK', [('Content-Type', 'text/html')]) + return ['Hello World!'] + base_response = wrappers.BaseResponse('Hello World!', content_type='text/html') + + class SpecialResponse(wrappers.Response): + def foo(self): + return 42 + + # good enough for this simple application, but don't ever use that in + # real world examples! + fake_env = {} + + for orig_resp in wsgi_application, base_response: + response = SpecialResponse.force_type(orig_resp, fake_env) + assert response.__class__ is SpecialResponse + assert response.foo() == 42 + assert response.data == 'Hello World!' + assert response.content_type == 'text/html' + + # without env, no arbitrary conversion + self.assert_raises(TypeError, SpecialResponse.force_type, wsgi_application) + + def test_accept_mixin(self): + request = wrappers.Request({ + 'HTTP_ACCEPT': 'text/xml,application/xml,application/xhtml+xml,' + 'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5', + 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', + 'HTTP_ACCEPT_ENCODING': 'gzip,deflate', + 'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5' + }) + assert request.accept_mimetypes == CharsetAccept([ + ('text/xml', 1), ('image/png', 1), ('application/xml', 1), + ('application/xhtml+xml', 1), ('text/html', 0.9), + ('text/plain', 0.8), ('*/*', 0.5) + ]) + assert request.accept_charsets == CharsetAccept([ + ('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7) + ]) + assert request.accept_encodings == CharsetAccept([('gzip', 1), ('deflate', 1)]) + assert request.accept_languages == CharsetAccept([('en-us', 1), ('en', 0.5)]) + + request = wrappers.Request({'HTTP_ACCEPT': ''}) + assert request.accept_mimetypes == CharsetAccept() + + def test_etag_request_mixin(self): + request = wrappers.Request({ + 'HTTP_CACHE_CONTROL': 'no-store, no-cache', + 'HTTP_IF_MATCH': 'w/"foo", bar, "baz"', + 'HTTP_IF_NONE_MATCH': 'w/"foo", bar, "baz"', + 'HTTP_IF_MODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT', + 'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT' + }) + assert request.cache_control.no_store + assert request.cache_control.no_cache + + for etags in request.if_match, request.if_none_match: + assert etags('bar') + assert etags.contains_raw('w/"foo"') + assert etags.contains_weak('foo') + assert not etags.contains('foo') + + assert request.if_modified_since == datetime(2008, 1, 22, 11, 18, 44) + assert request.if_unmodified_since == datetime(2008, 1, 22, 11, 18, 44) + + def test_user_agent_mixin(self): + user_agents = [ + ('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) ' + 'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11', + 'en-US'), + ('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54', + 'opera', 'windows', '8.54', 'de-DE'), + ('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 ' + '(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3', + 'safari', 'iphone', '419.3', 'en'), + ('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)', + 'google', None, '2.1', None) + ] + for ua, browser, platform, version, lang in user_agents: + request = wrappers.Request({'HTTP_USER_AGENT': ua}) + assert request.user_agent.browser == browser + assert request.user_agent.platform == platform + assert request.user_agent.version == version + assert request.user_agent.language == lang + assert bool(request.user_agent) + assert request.user_agent.to_header() == ua + assert str(request.user_agent) == ua + + request = wrappers.Request({'HTTP_USER_AGENT': 'foo'}) + assert not request.user_agent + + def test_etag_response_mixin(self): + response = wrappers.Response('Hello World') + assert response.get_etag() == (None, None) + response.add_etag() + assert response.get_etag() == ('b10a8db164e0754105b7a99be72e3fe5', False) + assert not response.cache_control + response.cache_control.must_revalidate = True + response.cache_control.max_age = 60 + response.headers['Content-Length'] = len(response.data) + assert response.headers['Cache-Control'] == 'must-revalidate, max-age=60' + + assert 'date' not in response.headers + env = create_environ() + env.update({ + 'REQUEST_METHOD': 'GET', + 'HTTP_IF_NONE_MATCH': response.get_etag()[0] + }) + response.make_conditional(env) + assert 'date' in response.headers + + # after the thing is invoked by the server as wsgi application + # (we're emulating this here), there must not be any entity + # headers left and the status code would have to be 304 + resp = wrappers.Response.from_app(response, env) + assert resp.status_code == 304 + assert not 'content-length' in resp.headers + + # make sure date is not overriden + response = wrappers.Response('Hello World') + response.date = 1337 + d = response.date + response.make_conditional(env) + assert response.date == d + + # make sure content length is only set if missing + response = wrappers.Response('Hello World') + response.content_length = 999 + response.make_conditional(env) + self.assert_equal(response.content_length, 999) + + def test_etag_response_mixin_freezing(self): + class WithFreeze(wrappers.ETagResponseMixin, wrappers.BaseResponse): + pass + class WithoutFreeze(wrappers.BaseResponse, wrappers.ETagResponseMixin): + pass + + response = WithFreeze('Hello World') + response.freeze() + assert response.get_etag() == (wrappers.generate_etag('Hello World'), False) + response = WithoutFreeze('Hello World') + response.freeze() + assert response.get_etag() == (None, None) + response = wrappers.Response('Hello World') + response.freeze() + assert response.get_etag() == (None, None) + + def test_authenticate_mixin(self): + resp = wrappers.Response() + resp.www_authenticate.type = 'basic' + resp.www_authenticate.realm = 'Testing' + assert resp.headers['WWW-Authenticate'] == 'Basic realm="Testing"' + resp.www_authenticate.realm = None + resp.www_authenticate.type = None + assert 'WWW-Authenticate' not in resp.headers + + def test_response_stream_mixin(self): + response = wrappers.Response() + response.stream.write('Hello ') + response.stream.write('World!') + assert response.response == ['Hello ', 'World!'] + assert response.data == 'Hello World!' + + def test_common_response_descriptors_mixin(self): + response = wrappers.Response() + response.mimetype = 'text/html' + assert response.mimetype == 'text/html' + assert response.content_type == 'text/html; charset=utf-8' + assert response.mimetype_params == {'charset': 'utf-8'} + response.mimetype_params['x-foo'] = 'yep' + del response.mimetype_params['charset'] + assert response.content_type == 'text/html; x-foo=yep' + + now = datetime.utcnow().replace(microsecond=0) + + assert response.content_length is None + response.content_length = '42' + assert response.content_length == 42 + + for attr in 'date', 'age', 'expires': + assert getattr(response, attr) is None + setattr(response, attr, now) + assert getattr(response, attr) == now + + assert response.retry_after is None + response.retry_after = now + assert response.retry_after == now + + assert not response.vary + response.vary.add('Cookie') + response.vary.add('Content-Language') + assert 'cookie' in response.vary + assert response.vary.to_header() == 'Cookie, Content-Language' + response.headers['Vary'] = 'Content-Encoding' + assert response.vary.as_set() == set(['content-encoding']) + + response.allow.update(['GET', 'POST']) + assert response.headers['Allow'] == 'GET, POST' + + response.content_language.add('en-US') + response.content_language.add('fr') + assert response.headers['Content-Language'] == 'en-US, fr' + + + def test_common_request_descriptors_mixin(self): + request = wrappers.Request.from_values(content_type='text/html; charset=utf-8', + content_length='23', + headers={ + 'Referer': 'http://www.example.com/', + 'Date': 'Sat, 28 Feb 2009 19:04:35 GMT', + 'Max-Forwards': '10', + 'Pragma': 'no-cache' + }) + + assert request.content_type == 'text/html; charset=utf-8' + assert request.mimetype == 'text/html' + assert request.mimetype_params == {'charset': 'utf-8'} + assert request.content_length == 23 + assert request.referrer == 'http://www.example.com/' + assert request.date == datetime(2009, 2, 28, 19, 4, 35) + assert request.max_forwards == 10 + assert 'no-cache' in request.pragma + + def test_shallow_mode(self): + """Request object shallow mode""" + request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True) + assert request.args['foo'] == 'bar' + self.assert_raises(RuntimeError, lambda: request.form['foo']) + + def test_form_parsing_failed(self): + data = ( + '--blah\r\n' + ) + data = wrappers.Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='multipart/form-data; boundary=foo', + method='POST') + assert not data.files + assert not data.form + + def test_url_charset_reflection(self): + req = wrappers.Request.from_values() + req.charset = 'utf-7' + assert req.url_charset == 'utf-7' + + def test_response_streamed(self): + r = wrappers.Response() + assert not r.is_streamed + r = wrappers.Response("Hello World") + assert not r.is_streamed + r = wrappers.Response(["foo", "bar"]) + assert not r.is_streamed + def gen(): + if 0: + yield None + r = wrappers.Response(gen()) + assert r.is_streamed + + def test_response_freeze(self): + def generate(): + yield "foo" + yield "bar" + resp = wrappers.Response(generate()) + resp.freeze() + assert resp.response == ['foo', 'bar'] + assert resp.headers['content-length'] == '6' + + def test_other_method_payload(self): + data = 'Hello World' + req = wrappers.Request.from_values(input_stream=StringIO(data), + content_length=len(data), + content_type='text/plain', + method='WHAT_THE_FUCK') + assert req.data == data + assert isinstance(req.stream, wrappers.LimitedStream) + + def test_urlfication(self): + resp = wrappers.Response() + resp.headers['Location'] = u'http://üser:pässword@☃.net/påth' + resp.headers['Content-Location'] = u'http://☃.net/' + headers = resp.get_wsgi_headers(create_environ()) + assert headers['location'] == \ + 'http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th' + assert headers['content-location'] == 'http://xn--n3h.net/' + + def test_new_response_iterator_behavior(self): + req = wrappers.Request.from_values() + resp = wrappers.Response(u'Hello Wörld!') + + def get_content_length(resp): + headers = wrappers.Headers.linked(resp.get_wsgi_headers(req.environ)) + return headers.get('content-length', type=int) + + def generate_items(): + yield "Hello " + yield u"Wörld!" + + # werkzeug encodes when set to `data` now, which happens + # if a string is passed to the response object. + assert resp.response == [u'Hello Wörld!'.encode('utf-8')] + assert resp.data == u'Hello Wörld!'.encode('utf-8') + assert get_content_length(resp) == 13 + assert not resp.is_streamed + assert resp.is_sequence + + # try the same for manual assignment + resp.data = u'Wörd' + assert resp.response == [u'Wörd'.encode('utf-8')] + assert resp.data == u'Wörd'.encode('utf-8') + assert get_content_length(resp) == 5 + assert not resp.is_streamed + assert resp.is_sequence + + # automatic generator sequence conversion + resp.response = generate_items() + assert resp.is_streamed + assert not resp.is_sequence + assert resp.data == u'Hello Wörld!'.encode('utf-8') + assert resp.response == ['Hello ', u'Wörld!'.encode('utf-8')] + assert not resp.is_streamed + assert resp.is_sequence + + # automatic generator sequence conversion + resp.response = generate_items() + resp.implicit_sequence_conversion = False + assert resp.is_streamed + assert not resp.is_sequence + self.assert_raises(RuntimeError, lambda: resp.data) + resp.make_sequence() + assert resp.data == u'Hello Wörld!'.encode('utf-8') + assert resp.response == ['Hello ', u'Wörld!'.encode('utf-8')] + assert not resp.is_streamed + assert resp.is_sequence + + # stream makes it a list no matter how the conversion is set + for val in True, False: + resp.implicit_sequence_conversion = val + resp.response = ("foo", "bar") + assert resp.is_sequence + resp.stream.write('baz') + assert resp.response == ['foo', 'bar', 'baz'] + + def test_form_data_ordering(self): + class MyRequest(wrappers.Request): + parameter_storage_class = ImmutableOrderedMultiDict + + req = MyRequest.from_values('/?foo=1&bar=0&foo=3') + assert list(req.args) == ['foo', 'bar'] + assert req.args.items(multi=True) == [ + ('foo', '1'), + ('bar', '0'), + ('foo', '3') + ] + assert isinstance(req.args, ImmutableOrderedMultiDict) + assert isinstance(req.values, CombinedMultiDict) + assert req.values['foo'] == '1' + assert req.values.getlist('foo') == ['1', '3'] + + def test_storage_classes(self): + class MyRequest(wrappers.Request): + dict_storage_class = dict + list_storage_class = list + parameter_storage_class = dict + req = MyRequest.from_values('/?foo=baz', headers={ + 'Cookie': 'foo=bar' + }) + assert type(req.cookies) is dict + assert req.cookies == {'foo': 'bar'} + assert type(req.access_route) is list + + assert type(req.args) is dict + assert type(req.values) is CombinedMultiDict + assert req.values['foo'] == 'baz' + + req = wrappers.Request.from_values(headers={ + 'Cookie': 'foo=bar' + }) + assert type(req.cookies) is ImmutableTypeConversionDict + assert req.cookies == {'foo': 'bar'} + assert type(req.access_route) is ImmutableList + + MyRequest.list_storage_class = tuple + req = MyRequest.from_values() + assert type(req.access_route) is tuple + + def test_response_headers_passthrough(self): + headers = wrappers.Headers() + resp = wrappers.Response(headers=headers) + assert resp.headers is headers + + def test_response_304_no_content_length(self): + resp = wrappers.Response('Test', status=304) + env = create_environ() + assert 'content-length' not in resp.get_wsgi_headers(env) + + def test_ranges(self): + # basic range stuff + req = wrappers.Request.from_values() + assert req.range is None + req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'}) + assert req.range.ranges == [(0, 500)] + + resp = wrappers.Response() + resp.content_range = req.range.make_content_range(1000) + assert resp.content_range.units == 'bytes' + assert resp.content_range.start == 0 + assert resp.content_range.stop == 500 + assert resp.content_range.length == 1000 + assert resp.headers['Content-Range'] == 'bytes 0-499/1000' + + resp.content_range.unset() + assert 'Content-Range' not in resp.headers + + resp.headers['Content-Range'] = 'bytes 0-499/1000' + assert resp.content_range.units == 'bytes' + assert resp.content_range.start == 0 + assert resp.content_range.stop == 500 + assert resp.content_range.length == 1000 + + def test_auto_content_length(self): + resp = wrappers.Response('Hello World!') + assert resp.content_length == 12 + + resp = wrappers.Response(['Hello World!']) + assert resp.content_length is None + assert resp.get_wsgi_headers({})['Content-Length'] == '12' + + def test_disabled_auto_content_length(self): + class MyResponse(wrappers.Response): + automatically_set_content_length = False + resp = MyResponse('Hello World!') + self.assert_(resp.content_length is None) + + resp = MyResponse(['Hello World!']) + self.assert_(resp.content_length is None) + self.assert_('Content-Length' not in resp.get_wsgi_headers({})) + + def test_location_header_autocorrect(self): + env = create_environ() + class MyResponse(wrappers.Response): + autocorrect_location_header = False + resp = MyResponse('Hello World!') + resp.headers['Location'] = '/test' + self.assert_equal(resp.get_wsgi_headers(env)['Location'], '/test') + + resp = wrappers.Response('Hello World!') + resp.headers['Location'] = '/test' + self.assert_equal(resp.get_wsgi_headers(env)['Location'], 'http://localhost/test') + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(WrappersTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/wsgi.py b/websdk/werkzeug/testsuite/wsgi.py new file mode 100644 index 0000000..6602fa2 --- /dev/null +++ b/websdk/werkzeug/testsuite/wsgi.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.wsgi + ~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the WSGI utilities. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest +from os import path +from cStringIO import StringIO + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.wrappers import BaseResponse +from werkzeug.exceptions import BadRequest, ClientDisconnected +from werkzeug.test import Client, create_environ, run_wsgi_app +from werkzeug import wsgi + + +class WSGIUtilsTestCase(WerkzeugTestCase): + + def test_shareddatamiddleware_get_file_loader(self): + app = wsgi.SharedDataMiddleware(None, {}) + assert callable(app.get_file_loader('foo')) + + def test_shared_data_middleware(self): + def null_application(environ, start_response): + start_response('404 NOT FOUND', [('Content-Type', 'text/plain')]) + yield 'NOT FOUND' + app = wsgi.SharedDataMiddleware(null_application, { + '/': path.join(path.dirname(__file__), 'res'), + '/sources': path.join(path.dirname(__file__), 'res'), + '/pkg': ('werkzeug.debug', 'shared') + }) + + for p in '/test.txt', '/sources/test.txt': + app_iter, status, headers = run_wsgi_app(app, create_environ(p)) + assert status == '200 OK' + assert ''.join(app_iter).strip() == 'FOUND' + + app_iter, status, headers = run_wsgi_app(app, create_environ('/pkg/debugger.js')) + contents = ''.join(app_iter) + assert '$(function() {' in contents + + app_iter, status, headers = run_wsgi_app(app, create_environ('/missing')) + assert status == '404 NOT FOUND' + assert ''.join(app_iter).strip() == 'NOT FOUND' + + def test_get_host(self): + env = {'HTTP_X_FORWARDED_HOST': 'example.org', + 'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'} + assert wsgi.get_host(env) == 'example.org' + assert wsgi.get_host(create_environ('/', 'http://example.org')) \ + == 'example.org' + + def test_responder(self): + def foo(environ, start_response): + return BaseResponse('Test') + client = Client(wsgi.responder(foo), BaseResponse) + response = client.get('/') + assert response.status_code == 200 + assert response.data == 'Test' + + def test_pop_path_info(self): + original_env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b///c'} + + # regular path info popping + def assert_tuple(script_name, path_info): + assert env.get('SCRIPT_NAME') == script_name + assert env.get('PATH_INFO') == path_info + env = original_env.copy() + pop = lambda: wsgi.pop_path_info(env) + + assert_tuple('/foo', '/a/b///c') + assert pop() == 'a' + assert_tuple('/foo/a', '/b///c') + assert pop() == 'b' + assert_tuple('/foo/a/b', '///c') + assert pop() == 'c' + assert_tuple('/foo/a/b///c', '') + assert pop() is None + + def test_peek_path_info(self): + env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/aaa/b///c'} + + assert wsgi.peek_path_info(env) == 'aaa' + assert wsgi.peek_path_info(env) == 'aaa' + + def test_limited_stream(self): + class RaisingLimitedStream(wsgi.LimitedStream): + def on_exhausted(self): + raise BadRequest('input stream exhausted') + + io = StringIO('123456') + stream = RaisingLimitedStream(io, 3) + assert stream.read() == '123' + self.assert_raises(BadRequest, stream.read) + + io = StringIO('123456') + stream = RaisingLimitedStream(io, 3) + assert stream.read(1) == '1' + assert stream.read(1) == '2' + assert stream.read(1) == '3' + self.assert_raises(BadRequest, stream.read) + + io = StringIO('123456\nabcdefg') + stream = wsgi.LimitedStream(io, 9) + assert stream.readline() == '123456\n' + assert stream.readline() == 'ab' + + io = StringIO('123456\nabcdefg') + stream = wsgi.LimitedStream(io, 9) + assert stream.readlines() == ['123456\n', 'ab'] + + io = StringIO('123456\nabcdefg') + stream = wsgi.LimitedStream(io, 9) + assert stream.readlines(2) == ['12'] + assert stream.readlines(2) == ['34'] + assert stream.readlines() == ['56\n', 'ab'] + + io = StringIO('123456\nabcdefg') + stream = wsgi.LimitedStream(io, 9) + assert stream.readline(100) == '123456\n' + + io = StringIO('123456\nabcdefg') + stream = wsgi.LimitedStream(io, 9) + assert stream.readlines(100) == ['123456\n', 'ab'] + + io = StringIO('123456') + stream = wsgi.LimitedStream(io, 3) + assert stream.read(1) == '1' + assert stream.read(1) == '2' + assert stream.read() == '3' + assert stream.read() == '' + + io = StringIO('123456') + stream = wsgi.LimitedStream(io, 3) + assert stream.read(-1) == '123' + + def test_limited_stream_disconnection(self): + io = StringIO('A bit of content') + + # disconnect detection on out of bytes + stream = wsgi.LimitedStream(io, 255) + with self.assert_raises(ClientDisconnected): + stream.read() + + # disconnect detection because file close + io = StringIO('x' * 255) + io.close() + stream = wsgi.LimitedStream(io, 255) + with self.assert_raises(ClientDisconnected): + stream.read() + + def test_path_info_extraction(self): + x = wsgi.extract_path_info('http://example.com/app', '/app/hello') + assert x == u'/hello' + x = wsgi.extract_path_info('http://example.com/app', + 'https://example.com/app/hello') + assert x == u'/hello' + x = wsgi.extract_path_info('http://example.com/app/', + 'https://example.com/app/hello') + assert x == u'/hello' + x = wsgi.extract_path_info('http://example.com/app/', + 'https://example.com/app') + assert x == u'/' + x = wsgi.extract_path_info(u'http://☃.net/', u'/fööbär') + assert x == u'/fööbär' + x = wsgi.extract_path_info(u'http://☃.net/x', u'http://☃.net/x/fööbär') + assert x == u'/fööbär' + + env = create_environ(u'/fööbär', u'http://☃.net/x/') + x = wsgi.extract_path_info(env, u'http://☃.net/x/fööbär') + assert x == u'/fööbär' + + x = wsgi.extract_path_info('http://example.com/app/', + 'https://example.com/a/hello') + assert x is None + x = wsgi.extract_path_info('http://example.com/app/', + 'https://example.com/app/hello', + collapse_http_schemes=False) + assert x is None + + def test_get_host_fallback(self): + assert wsgi.get_host({ + 'SERVER_NAME': 'foobar.example.com', + 'wsgi.url_scheme': 'http', + 'SERVER_PORT': '80' + }) == 'foobar.example.com' + assert wsgi.get_host({ + 'SERVER_NAME': 'foobar.example.com', + 'wsgi.url_scheme': 'http', + 'SERVER_PORT': '81' + }) == 'foobar.example.com:81' + + def test_multi_part_line_breaks(self): + data = 'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK' + test_stream = StringIO(data) + lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=16)) + assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n', 'ABCDEFGHIJK'] + + data = 'abc\r\nThis line is broken by the buffer length.\r\nFoo bar baz' + test_stream = StringIO(data) + lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=24)) + assert lines == ['abc\r\n', 'This line is broken by the buffer length.\r\n', 'Foo bar baz'] + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(WSGIUtilsTestCase)) + return suite |