Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/websdk/werkzeug/testsuite
diff options
context:
space:
mode:
Diffstat (limited to 'websdk/werkzeug/testsuite')
-rw-r--r--websdk/werkzeug/testsuite/__init__.py146
-rw-r--r--websdk/werkzeug/testsuite/compat.py58
-rw-r--r--websdk/werkzeug/testsuite/contrib/__init__.py19
-rw-r--r--websdk/werkzeug/testsuite/contrib/cache.py152
-rw-r--r--websdk/werkzeug/testsuite/contrib/fixers.py187
-rw-r--r--websdk/werkzeug/testsuite/contrib/iterio.py72
-rw-r--r--websdk/werkzeug/testsuite/contrib/securecookie.py65
-rw-r--r--websdk/werkzeug/testsuite/contrib/sessions.py81
-rw-r--r--websdk/werkzeug/testsuite/contrib/wrappers.py97
-rw-r--r--websdk/werkzeug/testsuite/datastructures.py618
-rw-r--r--websdk/werkzeug/testsuite/debug.py163
-rw-r--r--websdk/werkzeug/testsuite/exceptions.py87
-rw-r--r--websdk/werkzeug/testsuite/formparser.py370
-rw-r--r--websdk/werkzeug/testsuite/http.py392
-rw-r--r--websdk/werkzeug/testsuite/internal.py81
-rw-r--r--websdk/werkzeug/testsuite/local.py133
-rw-r--r--websdk/werkzeug/testsuite/multipart/collect.py56
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.pngbin0 -> 523 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.pngbin0 -> 703 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txtbin0 -> 1739 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt1
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.pngbin0 -> 781 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.pngbin0 -> 733 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txtbin0 -> 2042 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt3
-rw-r--r--websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.pngbin0 -> 523 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.pngbin0 -> 703 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txtbin0 -> 1798 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt1
-rw-r--r--websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txtbin0 -> 30044 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.pngbin0 -> 582 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.pngbin0 -> 733 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txtbin0 -> 1740 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt1
-rw-r--r--websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.pngbin0 -> 1002 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.pngbin0 -> 952 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txtbin0 -> 2408 bytes
-rw-r--r--websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt1
-rw-r--r--websdk/werkzeug/testsuite/res/test.txt1
-rw-r--r--websdk/werkzeug/testsuite/routing.py610
-rw-r--r--websdk/werkzeug/testsuite/security.py57
-rw-r--r--websdk/werkzeug/testsuite/serving.py84
-rw-r--r--websdk/werkzeug/testsuite/test.py376
-rw-r--r--websdk/werkzeug/testsuite/urls.py168
-rw-r--r--websdk/werkzeug/testsuite/utils.py255
-rw-r--r--websdk/werkzeug/testsuite/wrappers.py662
-rw-r--r--websdk/werkzeug/testsuite/wsgi.py217
47 files changed, 5214 insertions, 0 deletions
diff --git a/websdk/werkzeug/testsuite/__init__.py b/websdk/werkzeug/testsuite/__init__.py
new file mode 100644
index 0000000..e1c8acd
--- /dev/null
+++ b/websdk/werkzeug/testsuite/__init__.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite
+ ~~~~~~~~~~~~~~~~~~
+
+ Contains all test Werkzeug tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+from werkzeug.utils import import_string, find_modules
+
+
+def iter_suites(package):
+ """Yields all testsuites."""
+ for module in find_modules(package, include_packages=True):
+ mod = import_string(module)
+ if hasattr(mod, 'suite'):
+ yield mod.suite()
+
+
+def find_all_tests(suite):
+ """Yields all the tests and their names from a given suite."""
+ suites = [suite]
+ while suites:
+ s = suites.pop()
+ try:
+ suites.extend(s)
+ except TypeError:
+ yield s, '%s.%s.%s' % (
+ s.__class__.__module__,
+ s.__class__.__name__,
+ s._testMethodName
+ )
+
+
+class WerkzeugTestCase(unittest.TestCase):
+ """Baseclass for all the tests that Werkzeug uses. Use these
+ methods for testing instead of the camelcased ones in the
+ baseclass for consistency.
+ """
+
+ def setup(self):
+ pass
+
+ def teardown(self):
+ pass
+
+ def setUp(self):
+ self.setup()
+
+ def tearDown(self):
+ unittest.TestCase.tearDown(self)
+ self.teardown()
+
+ def assert_equal(self, x, y):
+ return self.assertEqual(x, y)
+
+ def assert_not_equal(self, x, y):
+ return self.assertNotEqual(x, y)
+
+ def assert_raises(self, exc_type, callable=None, *args, **kwargs):
+ catcher = _ExceptionCatcher(self, exc_type)
+ if callable is None:
+ return catcher
+ with catcher:
+ callable(*args, **kwargs)
+
+
+class _ExceptionCatcher(object):
+
+ def __init__(self, test_case, exc_type):
+ self.test_case = test_case
+ self.exc_type = exc_type
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ exception_name = self.exc_type.__name__
+ if exc_type is None:
+ self.test_case.fail('Expected exception of type %r' %
+ exception_name)
+ elif not issubclass(exc_type, self.exc_type):
+ raise exc_type, exc_value, tb
+ return True
+
+
+class BetterLoader(unittest.TestLoader):
+ """A nicer loader that solves two problems. First of all we are setting
+ up tests from different sources and we're doing this programmatically
+ which breaks the default loading logic so this is required anyways.
+ Secondly this loader has a nicer interpolation for test names than the
+ default one so you can just do ``run-tests.py ViewTestCase`` and it
+ will work.
+ """
+
+ def getRootSuite(self):
+ return suite()
+
+ def loadTestsFromName(self, name, module=None):
+ root = self.getRootSuite()
+ if name == 'suite':
+ return root
+
+ all_tests = []
+ for testcase, testname in find_all_tests(root):
+ if testname == name or \
+ testname.endswith('.' + name) or \
+ ('.' + name + '.') in testname or \
+ testname.startswith(name + '.'):
+ all_tests.append(testcase)
+
+ if not all_tests:
+ raise LookupError('could not find test case for "%s"' % name)
+
+ if len(all_tests) == 1:
+ return all_tests[0]
+ rv = unittest.TestSuite()
+ for test in all_tests:
+ rv.addTest(test)
+ return rv
+
+
+def suite():
+ """A testsuite that has all the Flask tests. You can use this
+ function to integrate the Flask tests into your own testsuite
+ in case you want to test that monkeypatches to Flask do not
+ break it.
+ """
+ suite = unittest.TestSuite()
+ for other_suite in iter_suites(__name__):
+ suite.addTest(other_suite)
+ return suite
+
+
+def main():
+ """Runs the testsuite as command line application."""
+ try:
+ unittest.main(testLoader=BetterLoader(), defaultTest='suite')
+ except Exception, e:
+ print 'Error: %s' % e
diff --git a/websdk/werkzeug/testsuite/compat.py b/websdk/werkzeug/testsuite/compat.py
new file mode 100644
index 0000000..f5487c3
--- /dev/null
+++ b/websdk/werkzeug/testsuite/compat.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.compat
+ ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Ensure that old stuff does not break on update.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+import warnings
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.wrappers import Response
+from werkzeug.test import create_environ
+
+
+class CompatTestCase(WerkzeugTestCase):
+
+ def test_old_imports(self):
+ from werkzeug.utils import Headers, MultiDict, CombinedMultiDict, \
+ Headers, EnvironHeaders
+ from werkzeug.http import Accept, MIMEAccept, CharsetAccept, \
+ LanguageAccept, ETags, HeaderSet, WWWAuthenticate, \
+ Authorization
+
+ def test_exposed_werkzeug_mod(self):
+ import werkzeug
+ for key in werkzeug.__all__:
+ # deprecated, skip it
+ if key in ('templates', 'Template'):
+ continue
+ getattr(werkzeug, key)
+
+ def test_fix_headers_in_response(self):
+ # ignore some warnings werkzeug emits for backwards compat
+ for msg in ['called into deprecated fix_headers',
+ 'fix_headers changed behavior']:
+ warnings.filterwarnings('ignore', message=msg,
+ category=DeprecationWarning)
+
+ class MyResponse(Response):
+ def fix_headers(self, environ):
+ Response.fix_headers(self, environ)
+ self.headers['x-foo'] = "meh"
+ myresp = MyResponse('Foo')
+ resp = Response.from_app(myresp, create_environ(method='GET'))
+ assert resp.headers['x-foo'] == 'meh'
+ assert resp.data == 'Foo'
+
+ warnings.resetwarnings()
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(CompatTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/__init__.py b/websdk/werkzeug/testsuite/contrib/__init__.py
new file mode 100644
index 0000000..b36a4ef
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/__init__.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.contrib
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the contrib modules.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+from werkzeug.testsuite import iter_suites
+
+
+def suite():
+ suite = unittest.TestSuite()
+ for other_suite in iter_suites(__name__):
+ suite.addTest(other_suite)
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/cache.py b/websdk/werkzeug/testsuite/contrib/cache.py
new file mode 100644
index 0000000..209189a
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/cache.py
@@ -0,0 +1,152 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.cache
+ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the cache system
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import os
+import time
+import unittest
+import tempfile
+import shutil
+
+from werkzeug.testsuite import WerkzeugTestCase
+from werkzeug.contrib import cache
+
+try:
+ import redis
+except ImportError:
+ redis = None
+
+
+
+class SimpleCacheTestCase(WerkzeugTestCase):
+
+ def test_get_dict(self):
+ c = cache.SimpleCache()
+ c.set('a', 'a')
+ c.set('b', 'b')
+ d = c.get_dict('a', 'b')
+ assert 'a' in d
+ assert 'a' == d['a']
+ assert 'b' in d
+ assert 'b' == d['b']
+
+ def test_set_many(self):
+ c = cache.SimpleCache()
+ c.set_many({0: 0, 1: 1, 2: 4})
+ assert c.get(2) == 4
+ c.set_many((i, i*i) for i in xrange(3))
+ assert c.get(2) == 4
+
+
+class FileSystemCacheTestCase(WerkzeugTestCase):
+
+ def test_set_get(self):
+ tmp_dir = tempfile.mkdtemp()
+ try:
+ c = cache.FileSystemCache(cache_dir=tmp_dir)
+ for i in range(3):
+ c.set(str(i), i * i)
+ for i in range(3):
+ result = c.get(str(i))
+ assert result == i * i
+ finally:
+ shutil.rmtree(tmp_dir)
+
+ def test_filesystemcache_prune(self):
+ THRESHOLD = 13
+ tmp_dir = tempfile.mkdtemp()
+ c = cache.FileSystemCache(cache_dir=tmp_dir, threshold=THRESHOLD)
+ for i in range(2 * THRESHOLD):
+ c.set(str(i), i)
+ cache_files = os.listdir(tmp_dir)
+ shutil.rmtree(tmp_dir)
+ assert len(cache_files) <= THRESHOLD
+
+
+ def test_filesystemcache_clear(self):
+ tmp_dir = tempfile.mkdtemp()
+ c = cache.FileSystemCache(cache_dir=tmp_dir)
+ c.set('foo', 'bar')
+ cache_files = os.listdir(tmp_dir)
+ assert len(cache_files) == 1
+ c.clear()
+ cache_files = os.listdir(tmp_dir)
+ assert len(cache_files) == 0
+ shutil.rmtree(tmp_dir)
+
+
+class RedisCacheTestCase(WerkzeugTestCase):
+
+ def make_cache(self):
+ return cache.RedisCache(key_prefix='werkzeug-test-case:')
+
+ def teardown(self):
+ self.make_cache().clear()
+
+ def test_get_set(self):
+ c = self.make_cache()
+ c.set('foo', 'bar')
+ assert c.get('foo') == 'bar'
+
+ def test_get_many(self):
+ c = self.make_cache()
+ c.set('foo', 'bar')
+ c.set('spam', 'eggs')
+ assert c.get_many('foo', 'spam') == ['bar', 'eggs']
+
+ def test_set_many(self):
+ c = self.make_cache()
+ c.set_many({'foo': 'bar', 'spam': 'eggs'})
+ assert c.get('foo') == 'bar'
+ assert c.get('spam') == 'eggs'
+
+ def test_expire(self):
+ c = self.make_cache()
+ c.set('foo', 'bar', 1)
+ time.sleep(2)
+ assert c.get('foo') is None
+
+ def test_add(self):
+ c = self.make_cache()
+ # sanity check that add() works like set()
+ c.add('foo', 'bar')
+ assert c.get('foo') == 'bar'
+ c.add('foo', 'qux')
+ assert c.get('foo') == 'bar'
+
+ def test_delete(self):
+ c = self.make_cache()
+ c.add('foo', 'bar')
+ assert c.get('foo') == 'bar'
+ c.delete('foo')
+ assert c.get('foo') is None
+
+ def test_delete_many(self):
+ c = self.make_cache()
+ c.add('foo', 'bar')
+ c.add('spam', 'eggs')
+ c.delete_many('foo', 'spam')
+ assert c.get('foo') is None
+ assert c.get('spam') is None
+
+ def test_inc_dec(self):
+ c = self.make_cache()
+ c.set('foo', 1)
+ assert c.inc('foo') == 2
+ assert c.dec('foo') == 1
+ c.delete('foo')
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SimpleCacheTestCase))
+ suite.addTest(unittest.makeSuite(FileSystemCacheTestCase))
+ if redis is not None:
+ suite.addTest(unittest.makeSuite(RedisCacheTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/fixers.py b/websdk/werkzeug/testsuite/contrib/fixers.py
new file mode 100644
index 0000000..0422f8b
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/fixers.py
@@ -0,0 +1,187 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.fixers
+ ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Server / Browser fixers.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+from werkzeug.datastructures import ResponseCacheControl
+from werkzeug.http import parse_cache_control_header
+
+from werkzeug.test import create_environ, Client
+from werkzeug.wrappers import Request, Response
+from werkzeug.contrib import fixers
+from werkzeug.utils import redirect
+
+
+@Request.application
+def path_check_app(request):
+ return Response('PATH_INFO: %s\nSCRIPT_NAME: %s' % (
+ request.environ.get('PATH_INFO', ''),
+ request.environ.get('SCRIPT_NAME', '')
+ ))
+
+
+class ServerFixerTestCase(WerkzeugTestCase):
+
+ def test_lighttpd_cgi_root_fix(self):
+ app = fixers.LighttpdCGIRootFix(path_check_app)
+ response = Response.from_app(app, dict(create_environ(),
+ SCRIPT_NAME='/foo',
+ PATH_INFO='/bar',
+ SERVER_SOFTWARE='lighttpd/1.4.27'
+ ))
+ assert response.data == 'PATH_INFO: /foo/bar\nSCRIPT_NAME: '
+
+ def test_path_info_from_request_uri_fix(self):
+ app = fixers.PathInfoFromRequestUriFix(path_check_app)
+ for key in 'REQUEST_URI', 'REQUEST_URL', 'UNENCODED_URL':
+ env = dict(create_environ(), SCRIPT_NAME='/test', PATH_INFO='/?????')
+ env[key] = '/test/foo%25bar?drop=this'
+ response = Response.from_app(app, env)
+ assert response.data == 'PATH_INFO: /foo%bar\nSCRIPT_NAME: /test'
+
+ def test_proxy_fix(self):
+ """Test the ProxyFix fixer"""
+ @fixers.ProxyFix
+ @Request.application
+ def app(request):
+ return Response('%s|%s' % (
+ request.remote_addr,
+ # do not use request.host as this fixes too :)
+ request.environ['HTTP_HOST']
+ ))
+ environ = dict(create_environ(),
+ HTTP_X_FORWARDED_PROTO="https",
+ HTTP_X_FORWARDED_HOST='example.com',
+ HTTP_X_FORWARDED_FOR='1.2.3.4, 5.6.7.8',
+ REMOTE_ADDR='127.0.0.1',
+ HTTP_HOST='fake'
+ )
+
+ response = Response.from_app(app, environ)
+
+ assert response.data == '1.2.3.4|example.com'
+
+ # And we must check that if it is a redirection it is
+ # correctly done:
+
+ redirect_app = redirect('/foo/bar.hml')
+ response = Response.from_app(redirect_app, environ)
+
+ wsgi_headers = response.get_wsgi_headers(environ)
+ assert wsgi_headers['Location'] == 'https://example.com/foo/bar.hml'
+
+ def test_proxy_fix_weird_enum(self):
+ @fixers.ProxyFix
+ @Request.application
+ def app(request):
+ return Response(request.remote_addr)
+ environ = dict(create_environ(),
+ HTTP_X_FORWARDED_FOR=',',
+ REMOTE_ADDR='127.0.0.1',
+ )
+
+ response = Response.from_app(app, environ)
+ self.assert_equal(response.data, '127.0.0.1')
+
+ def test_header_rewriter_fix(self):
+ """Test the HeaderRewriterFix fixer"""
+ @Request.application
+ def application(request):
+ return Response("", headers=[
+ ('X-Foo', 'bar')
+ ])
+ application = fixers.HeaderRewriterFix(application, ('X-Foo',), (('X-Bar', '42'),))
+ response = Response.from_app(application, create_environ())
+ assert response.headers['Content-Type'] == 'text/plain; charset=utf-8'
+ assert 'X-Foo' not in response.headers
+ assert response.headers['X-Bar'] == '42'
+
+
+class BrowserFixerTestCase(WerkzeugTestCase):
+
+ def test_ie_fixes(self):
+ @fixers.InternetExplorerFix
+ @Request.application
+ def application(request):
+ response = Response('binary data here', mimetype='application/vnd.ms-excel')
+ response.headers['Vary'] = 'Cookie'
+ response.headers['Content-Disposition'] = 'attachment; filename=foo.xls'
+ return response
+
+ c = Client(application, Response)
+ response = c.get('/', headers=[
+ ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)')
+ ])
+
+ # IE gets no vary
+ assert response.data == 'binary data here'
+ assert 'vary' not in response.headers
+ assert response.headers['content-disposition'] == 'attachment; filename=foo.xls'
+ assert response.headers['content-type'] == 'application/vnd.ms-excel'
+
+ # other browsers do
+ c = Client(application, Response)
+ response = c.get('/')
+ assert response.data == 'binary data here'
+ assert 'vary' in response.headers
+
+ cc = ResponseCacheControl()
+ cc.no_cache = True
+
+ @fixers.InternetExplorerFix
+ @Request.application
+ def application(request):
+ response = Response('binary data here', mimetype='application/vnd.ms-excel')
+ response.headers['Pragma'] = ', '.join(pragma)
+ response.headers['Cache-Control'] = cc.to_header()
+ response.headers['Content-Disposition'] = 'attachment; filename=foo.xls'
+ return response
+
+
+ # IE has no pragma or cache control
+ pragma = ('no-cache',)
+ c = Client(application, Response)
+ response = c.get('/', headers=[
+ ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)')
+ ])
+ assert response.data == 'binary data here'
+ assert 'pragma' not in response.headers
+ assert 'cache-control' not in response.headers
+ assert response.headers['content-disposition'] == 'attachment; filename=foo.xls'
+
+ # IE has simplified pragma
+ pragma = ('no-cache', 'x-foo')
+ cc.proxy_revalidate = True
+ response = c.get('/', headers=[
+ ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)')
+ ])
+ assert response.data == 'binary data here'
+ assert response.headers['pragma'] == 'x-foo'
+ assert response.headers['cache-control'] == 'proxy-revalidate'
+ assert response.headers['content-disposition'] == 'attachment; filename=foo.xls'
+
+ # regular browsers get everything
+ response = c.get('/')
+ assert response.data == 'binary data here'
+ assert response.headers['pragma'] == 'no-cache, x-foo'
+ cc = parse_cache_control_header(response.headers['cache-control'],
+ cls=ResponseCacheControl)
+ assert cc.no_cache
+ assert cc.proxy_revalidate
+ assert response.headers['content-disposition'] == 'attachment; filename=foo.xls'
+
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ServerFixerTestCase))
+ suite.addTest(unittest.makeSuite(BrowserFixerTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/iterio.py b/websdk/werkzeug/testsuite/contrib/iterio.py
new file mode 100644
index 0000000..cddbd6e
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/iterio.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.iterio
+ ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the iterio object.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+from werkzeug.contrib.iterio import IterIO, greenlet
+
+
+class IterOTestSuite(WerkzeugTestCase):
+
+ def test_basic(self):
+ io = IterIO(["Hello", "World", "1", "2", "3"])
+ assert io.tell() == 0
+ assert io.read(2) == "He"
+ assert io.tell() == 2
+ assert io.read(3) == "llo"
+ assert io.tell() == 5
+ io.seek(0)
+ assert io.read(5) == "Hello"
+ assert io.tell() == 5
+ assert io._buf == "Hello"
+ assert io.read() == "World123"
+ assert io.tell() == 13
+ io.close()
+ assert io.closed
+
+ io = IterIO(["Hello\n", "World!"])
+ assert io.readline() == 'Hello\n'
+ assert io._buf == 'Hello\n'
+ assert io.read() == 'World!'
+ assert io._buf == 'Hello\nWorld!'
+ assert io.tell() == 12
+ io.seek(0)
+ assert io.readlines() == ['Hello\n', 'World!']
+
+ io = IterIO(["foo\n", "bar"])
+ io.seek(-4, 2)
+ assert io.read(4) == '\nbar'
+
+ self.assert_raises(IOError, io.seek, 2, 100)
+ io.close()
+ self.assert_raises(ValueError, io.read)
+
+
+class IterITestSuite(WerkzeugTestCase):
+
+ def test_basic(self):
+ def producer(out):
+ out.write('1\n')
+ out.write('2\n')
+ out.flush()
+ out.write('3\n')
+ iterable = IterIO(producer)
+ self.assert_equal(iterable.next(), '1\n2\n')
+ self.assert_equal(iterable.next(), '3\n')
+ self.assert_raises(StopIteration, iterable.next)
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(IterOTestSuite))
+ if greenlet is not None:
+ suite.addTest(unittest.makeSuite(IterITestSuite))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/securecookie.py b/websdk/werkzeug/testsuite/contrib/securecookie.py
new file mode 100644
index 0000000..fb10fa1
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/securecookie.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.securecookie
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the secure cookie.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.utils import parse_cookie
+from werkzeug.wrappers import Request, Response
+from werkzeug.contrib.securecookie import SecureCookie
+
+
+class SecureCookieTestCase(WerkzeugTestCase):
+
+ def test_basic_support(self):
+ c = SecureCookie(secret_key='foo')
+ assert c.new
+ print c.modified, c.should_save
+ assert not c.modified
+ assert not c.should_save
+ c['x'] = 42
+ assert c.modified
+ assert c.should_save
+ s = c.serialize()
+
+ c2 = SecureCookie.unserialize(s, 'foo')
+ assert c is not c2
+ assert not c2.new
+ assert not c2.modified
+ assert not c2.should_save
+ assert c2 == c
+
+ c3 = SecureCookie.unserialize(s, 'wrong foo')
+ assert not c3.modified
+ assert not c3.new
+ assert c3 == {}
+
+ def test_wrapper_support(self):
+ req = Request.from_values()
+ resp = Response()
+ c = SecureCookie.load_cookie(req, secret_key='foo')
+ assert c.new
+ c['foo'] = 42
+ assert c.secret_key == 'foo'
+ c.save_cookie(resp)
+
+ req = Request.from_values(headers={
+ 'Cookie': 'session="%s"' % parse_cookie(resp.headers['set-cookie'])['session']
+ })
+ c2 = SecureCookie.load_cookie(req, secret_key='foo')
+ assert not c2.new
+ assert c2 == c
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SecureCookieTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/sessions.py b/websdk/werkzeug/testsuite/contrib/sessions.py
new file mode 100644
index 0000000..18015a6
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/sessions.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.sessions
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Added tests for the sessions.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+import shutil
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.contrib.sessions import FilesystemSessionStore
+
+from tempfile import mkdtemp, gettempdir
+
+
+class SessionTestCase(WerkzeugTestCase):
+
+ def setup(self):
+ self.session_folder = mkdtemp()
+
+ def teardown(self):
+ shutil.rmtree(self.session_folder)
+
+ def test_default_tempdir(self):
+ store = FilesystemSessionStore()
+ assert store.path == gettempdir()
+
+ def test_basic_fs_sessions(self):
+ store = FilesystemSessionStore(self.session_folder)
+ x = store.new()
+ assert x.new
+ assert not x.modified
+ x['foo'] = [1, 2, 3]
+ assert x.modified
+ store.save(x)
+
+ x2 = store.get(x.sid)
+ assert not x2.new
+ assert not x2.modified
+ assert x2 is not x
+ assert x2 == x
+ x2['test'] = 3
+ assert x2.modified
+ assert not x2.new
+ store.save(x2)
+
+ x = store.get(x.sid)
+ store.delete(x)
+ x2 = store.get(x.sid)
+ # the session is not new when it was used previously.
+ assert not x2.new
+
+ def test_renewing_fs_session(self):
+ store = FilesystemSessionStore(self.session_folder, renew_missing=True)
+ x = store.new()
+ store.save(x)
+ store.delete(x)
+ x2 = store.get(x.sid)
+ assert x2.new
+
+ def test_fs_session_lising(self):
+ store = FilesystemSessionStore(self.session_folder, renew_missing=True)
+ sessions = set()
+ for x in xrange(10):
+ sess = store.new()
+ store.save(sess)
+ sessions.add(sess.sid)
+
+ listed_sessions = set(store.list())
+ assert sessions == listed_sessions
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SessionTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/contrib/wrappers.py b/websdk/werkzeug/testsuite/contrib/wrappers.py
new file mode 100644
index 0000000..e22d9a4
--- /dev/null
+++ b/websdk/werkzeug/testsuite/contrib/wrappers.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.contrib.wrappers
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Added tests for the sessions.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.contrib import wrappers
+from werkzeug import routing
+from werkzeug.wrappers import Request, Response
+
+
+class WrappersTestCase(WerkzeugTestCase):
+
+ def test_reverse_slash_behavior(self):
+ class MyRequest(wrappers.ReverseSlashBehaviorRequestMixin, Request):
+ pass
+ req = MyRequest.from_values('/foo/bar', 'http://example.com/test')
+ assert req.url == 'http://example.com/test/foo/bar'
+ assert req.path == 'foo/bar'
+ assert req.script_root == '/test/'
+
+ # make sure the routing system works with the slashes in
+ # reverse order as well.
+ map = routing.Map([routing.Rule('/foo/bar', endpoint='foo')])
+ adapter = map.bind_to_environ(req.environ)
+ assert adapter.match() == ('foo', {})
+ adapter = map.bind(req.host, req.script_root)
+ assert adapter.match(req.path) == ('foo', {})
+
+ def test_dynamic_charset_request_mixin(self):
+ class MyRequest(wrappers.DynamicCharsetRequestMixin, Request):
+ pass
+ env = {'CONTENT_TYPE': 'text/html'}
+ req = MyRequest(env)
+ assert req.charset == 'latin1'
+
+ env = {'CONTENT_TYPE': 'text/html; charset=utf-8'}
+ req = MyRequest(env)
+ assert req.charset == 'utf-8'
+
+ env = {'CONTENT_TYPE': 'application/octet-stream'}
+ req = MyRequest(env)
+ assert req.charset == 'latin1'
+ assert req.url_charset == 'latin1'
+
+ MyRequest.url_charset = 'utf-8'
+ env = {'CONTENT_TYPE': 'application/octet-stream'}
+ req = MyRequest(env)
+ assert req.charset == 'latin1'
+ assert req.url_charset == 'utf-8'
+
+ def return_ascii(x):
+ return "ascii"
+ env = {'CONTENT_TYPE': 'text/plain; charset=x-weird-charset'}
+ req = MyRequest(env)
+ req.unknown_charset = return_ascii
+ assert req.charset == 'ascii'
+ assert req.url_charset == 'utf-8'
+
+ def test_dynamic_charset_response_mixin(self):
+ class MyResponse(wrappers.DynamicCharsetResponseMixin, Response):
+ default_charset = 'utf-7'
+ resp = MyResponse(mimetype='text/html')
+ assert resp.charset == 'utf-7'
+ resp.charset = 'utf-8'
+ assert resp.charset == 'utf-8'
+ assert resp.mimetype == 'text/html'
+ assert resp.mimetype_params == {'charset': 'utf-8'}
+ resp.mimetype_params['charset'] = 'iso-8859-15'
+ assert resp.charset == 'iso-8859-15'
+ resp.data = u'Hällo Wörld'
+ assert ''.join(resp.iter_encoded()) == \
+ u'Hällo Wörld'.encode('iso-8859-15')
+ del resp.headers['content-type']
+ try:
+ resp.charset = 'utf-8'
+ except TypeError, e:
+ pass
+ else:
+ assert False, 'expected type error on charset setting without ct'
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(WrappersTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/datastructures.py b/websdk/werkzeug/testsuite/datastructures.py
new file mode 100644
index 0000000..3efedb7
--- /dev/null
+++ b/websdk/werkzeug/testsuite/datastructures.py
@@ -0,0 +1,618 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.datastructures
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the functionality of the provided Werkzeug
+ datastructures.
+
+ TODO:
+
+ - FileMultiDict
+ - convert to proper asserts
+ - Immutable types undertested
+ - Split up dict tests
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+import pickle
+from copy import copy
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import datastructures
+from werkzeug.exceptions import BadRequestKeyError
+
+
+class MutableMultiDictBaseTestCase(WerkzeugTestCase):
+ storage_class = None
+
+ def test_pickle(self):
+ cls = self.storage_class
+
+ for protocol in xrange(pickle.HIGHEST_PROTOCOL + 1):
+ d = cls()
+ d.setlist('foo', [1, 2, 3, 4])
+ d.setlist('bar', 'foo bar baz'.split())
+ s = pickle.dumps(d, protocol)
+ ud = pickle.loads(s)
+ self.assert_equal(type(ud), type(d))
+ self.assert_equal(ud, d)
+ self.assert_equal(pickle.loads(
+ s.replace('werkzeug.datastructures', 'werkzeug')), d)
+ ud['newkey'] = 'bla'
+ self.assert_not_equal(ud, d)
+
+ def test_basic_interface(self):
+ md = self.storage_class()
+ assert isinstance(md, dict)
+
+ mapping = [('a', 1), ('b', 2), ('a', 2), ('d', 3),
+ ('a', 1), ('a', 3), ('d', 4), ('c', 3)]
+ md = self.storage_class(mapping)
+
+ # simple getitem gives the first value
+ assert md['a'] == 1
+ assert md['c'] == 3
+ with self.assert_raises(KeyError):
+ md['e']
+ assert md.get('a') == 1
+
+ # list getitem
+ assert md.getlist('a') == [1, 2, 1, 3]
+ assert md.getlist('d') == [3, 4]
+ # do not raise if key not found
+ assert md.getlist('x') == []
+
+ # simple setitem overwrites all values
+ md['a'] = 42
+ assert md.getlist('a') == [42]
+
+ # list setitem
+ md.setlist('a', [1, 2, 3])
+ assert md['a'] == 1
+ assert md.getlist('a') == [1, 2, 3]
+
+ # verify that it does not change original lists
+ l1 = [1, 2, 3]
+ md.setlist('a', l1)
+ del l1[:]
+ assert md['a'] == 1
+
+ # setdefault, setlistdefault
+ assert md.setdefault('u', 23) == 23
+ assert md.getlist('u') == [23]
+ del md['u']
+
+ md.setlist('u', [-1, -2])
+
+ # delitem
+ del md['u']
+ with self.assert_raises(KeyError):
+ md['u']
+ del md['d']
+ assert md.getlist('d') == []
+
+ # keys, values, items, lists
+ assert list(sorted(md.keys())) == ['a', 'b', 'c']
+ assert list(sorted(md.iterkeys())) == ['a', 'b', 'c']
+
+ assert list(sorted(md.values())) == [1, 2, 3]
+ assert list(sorted(md.itervalues())) == [1, 2, 3]
+
+ assert list(sorted(md.items())) == [('a', 1), ('b', 2), ('c', 3)]
+ assert list(sorted(md.items(multi=True))) == \
+ [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('c', 3)]
+ assert list(sorted(md.iteritems())) == [('a', 1), ('b', 2), ('c', 3)]
+ assert list(sorted(md.iteritems(multi=True))) == \
+ [('a', 1), ('a', 2), ('a', 3), ('b', 2), ('c', 3)]
+
+ assert list(sorted(md.lists())) == [('a', [1, 2, 3]), ('b', [2]), ('c', [3])]
+ assert list(sorted(md.iterlists())) == [('a', [1, 2, 3]), ('b', [2]), ('c', [3])]
+
+ # copy method
+ c = md.copy()
+ assert c['a'] == 1
+ assert c.getlist('a') == [1, 2, 3]
+
+ # copy method 2
+ c = copy(md)
+ assert c['a'] == 1
+ assert c.getlist('a') == [1, 2, 3]
+
+ # update with a multidict
+ od = self.storage_class([('a', 4), ('a', 5), ('y', 0)])
+ md.update(od)
+ assert md.getlist('a') == [1, 2, 3, 4, 5]
+ assert md.getlist('y') == [0]
+
+ # update with a regular dict
+ md = c
+ od = {'a': 4, 'y': 0}
+ md.update(od)
+ assert md.getlist('a') == [1, 2, 3, 4]
+ assert md.getlist('y') == [0]
+
+ # pop, poplist, popitem, popitemlist
+ assert md.pop('y') == 0
+ assert 'y' not in md
+ assert md.poplist('a') == [1, 2, 3, 4]
+ assert 'a' not in md
+ assert md.poplist('missing') == []
+
+ # remaining: b=2, c=3
+ popped = md.popitem()
+ assert popped in [('b', 2), ('c', 3)]
+ popped = md.popitemlist()
+ assert popped in [('b', [2]), ('c', [3])]
+
+ # type conversion
+ md = self.storage_class({'a': '4', 'b': ['2', '3']})
+ assert md.get('a', type=int) == 4
+ assert md.getlist('b', type=int) == [2, 3]
+
+ # repr
+ md = self.storage_class([('a', 1), ('a', 2), ('b', 3)])
+ assert "('a', 1)" in repr(md)
+ assert "('a', 2)" in repr(md)
+ assert "('b', 3)" in repr(md)
+
+ # add and getlist
+ md.add('c', '42')
+ md.add('c', '23')
+ assert md.getlist('c') == ['42', '23']
+ md.add('c', 'blah')
+ assert md.getlist('c', type=int) == [42, 23]
+
+ # setdefault
+ md = self.storage_class()
+ md.setdefault('x', []).append(42)
+ md.setdefault('x', []).append(23)
+ assert md['x'] == [42, 23]
+
+ # to dict
+ md = self.storage_class()
+ md['foo'] = 42
+ md.add('bar', 1)
+ md.add('bar', 2)
+ assert md.to_dict() == {'foo': 42, 'bar': 1}
+ assert md.to_dict(flat=False) == {'foo': [42], 'bar': [1, 2]}
+
+ # popitem from empty dict
+ with self.assert_raises(KeyError):
+ self.storage_class().popitem()
+
+ with self.assert_raises(KeyError):
+ self.storage_class().popitemlist()
+
+ # key errors are of a special type
+ with self.assert_raises(BadRequestKeyError):
+ self.storage_class()[42]
+
+ # setlist works
+ md = self.storage_class()
+ md['foo'] = 42
+ md.setlist('foo', [1, 2])
+ assert md.getlist('foo') == [1, 2]
+
+
+class ImmutableDictBaseTestCase(WerkzeugTestCase):
+ storage_class = None
+
+ def test_follows_dict_interface(self):
+ cls = self.storage_class
+
+ data = {'foo': 1, 'bar': 2, 'baz': 3}
+ d = cls(data)
+
+ self.assert_equal(d['foo'], 1)
+ self.assert_equal(d['bar'], 2)
+ self.assert_equal(d['baz'], 3)
+ self.assert_equal(sorted(d.keys()), ['bar', 'baz', 'foo'])
+ self.assert_('foo' in d)
+ self.assert_('foox' not in d)
+ self.assert_equal(len(d), 3)
+
+ def test_copies_are_mutable(self):
+ cls = self.storage_class
+ immutable = cls({'a': 1})
+ with self.assert_raises(TypeError):
+ immutable.pop('a')
+
+ mutable = immutable.copy()
+ mutable.pop('a')
+ self.assert_('a' in immutable)
+ self.assert_(mutable is not immutable)
+ self.assert_(copy(immutable) is immutable)
+
+ def test_dict_is_hashable(self):
+ cls = self.storage_class
+ immutable = cls({'a': 1, 'b': 2})
+ immutable2 = cls({'a': 2, 'b': 2})
+ x = set([immutable])
+ self.assert_(immutable in x)
+ self.assert_(immutable2 not in x)
+ x.discard(immutable)
+ self.assert_(immutable not in x)
+ self.assert_(immutable2 not in x)
+ x.add(immutable2)
+ self.assert_(immutable not in x)
+ self.assert_(immutable2 in x)
+ x.add(immutable)
+ self.assert_(immutable in x)
+ self.assert_(immutable2 in x)
+
+
+class ImmutableTypeConversionDictTestCase(ImmutableDictBaseTestCase):
+ storage_class = datastructures.ImmutableTypeConversionDict
+
+
+class ImmutableMultiDictTestCase(ImmutableDictBaseTestCase):
+ storage_class = datastructures.ImmutableMultiDict
+
+ def test_multidict_is_hashable(self):
+ cls = self.storage_class
+ immutable = cls({'a': [1, 2], 'b': 2})
+ immutable2 = cls({'a': [1], 'b': 2})
+ x = set([immutable])
+ self.assert_(immutable in x)
+ self.assert_(immutable2 not in x)
+ x.discard(immutable)
+ self.assert_(immutable not in x)
+ self.assert_(immutable2 not in x)
+ x.add(immutable2)
+ self.assert_(immutable not in x)
+ self.assert_(immutable2 in x)
+ x.add(immutable)
+ self.assert_(immutable in x)
+ self.assert_(immutable2 in x)
+
+
+class ImmutableDictTestCase(ImmutableDictBaseTestCase):
+ storage_class = datastructures.ImmutableDict
+
+
+class ImmutableOrderedMultiDictTestCase(ImmutableDictBaseTestCase):
+ storage_class = datastructures.ImmutableOrderedMultiDict
+
+ def test_ordered_multidict_is_hashable(self):
+ a = self.storage_class([('a', 1), ('b', 1), ('a', 2)])
+ b = self.storage_class([('a', 1), ('a', 2), ('b', 1)])
+ self.assert_not_equal(hash(a), hash(b))
+
+
+class MultiDictTestCase(MutableMultiDictBaseTestCase):
+ storage_class = datastructures.MultiDict
+
+ def test_multidict_pop(self):
+ make_d = lambda: self.storage_class({'foo': [1, 2, 3, 4]})
+ d = make_d()
+ assert d.pop('foo') == 1
+ assert not d
+ d = make_d()
+ assert d.pop('foo', 32) == 1
+ assert not d
+ d = make_d()
+ assert d.pop('foos', 32) == 32
+ assert d
+
+ with self.assert_raises(KeyError):
+ d.pop('foos')
+
+ def test_setlistdefault(self):
+ md = self.storage_class()
+ assert md.setlistdefault('u', [-1, -2]) == [-1, -2]
+ assert md.getlist('u') == [-1, -2]
+ assert md['u'] == -1
+
+ def test_iter_interfaces(self):
+ mapping = [('a', 1), ('b', 2), ('a', 2), ('d', 3),
+ ('a', 1), ('a', 3), ('d', 4), ('c', 3)]
+ md = self.storage_class(mapping)
+ assert list(zip(md.keys(), md.listvalues())) == list(md.lists())
+ assert list(zip(md, md.iterlistvalues())) == list(md.iterlists())
+ assert list(zip(md.iterkeys(), md.iterlistvalues())) == list(md.iterlists())
+
+
+class OrderedMultiDictTestCase(MutableMultiDictBaseTestCase):
+ storage_class = datastructures.OrderedMultiDict
+
+ def test_ordered_interface(self):
+ cls = self.storage_class
+
+ d = cls()
+ assert not d
+ d.add('foo', 'bar')
+ assert len(d) == 1
+ d.add('foo', 'baz')
+ assert len(d) == 1
+ assert d.items() == [('foo', 'bar')]
+ assert list(d) == ['foo']
+ assert d.items(multi=True) == [('foo', 'bar'),
+ ('foo', 'baz')]
+ del d['foo']
+ assert not d
+ assert len(d) == 0
+ assert list(d) == []
+
+ d.update([('foo', 1), ('foo', 2), ('bar', 42)])
+ d.add('foo', 3)
+ assert d.getlist('foo') == [1, 2, 3]
+ assert d.getlist('bar') == [42]
+ assert d.items() == [('foo', 1), ('bar', 42)]
+ assert d.keys() == list(d) == list(d.iterkeys()) == ['foo', 'bar']
+ assert d.items(multi=True) == [('foo', 1), ('foo', 2),
+ ('bar', 42), ('foo', 3)]
+ assert len(d) == 2
+
+ assert d.pop('foo') == 1
+ assert d.pop('blafasel', None) is None
+ assert d.pop('blafasel', 42) == 42
+ assert len(d) == 1
+ assert d.poplist('bar') == [42]
+ assert not d
+
+ d.get('missingkey') is None
+
+ d.add('foo', 42)
+ d.add('foo', 23)
+ d.add('bar', 2)
+ d.add('foo', 42)
+ assert d == datastructures.MultiDict(d)
+ id = self.storage_class(d)
+ assert d == id
+ d.add('foo', 2)
+ assert d != id
+
+ d.update({'blah': [1, 2, 3]})
+ assert d['blah'] == 1
+ assert d.getlist('blah') == [1, 2, 3]
+
+ # setlist works
+ d = self.storage_class()
+ d['foo'] = 42
+ d.setlist('foo', [1, 2])
+ assert d.getlist('foo') == [1, 2]
+
+ with self.assert_raises(BadRequestKeyError):
+ d.pop('missing')
+ with self.assert_raises(BadRequestKeyError):
+ d['missing']
+
+ # popping
+ d = self.storage_class()
+ d.add('foo', 23)
+ d.add('foo', 42)
+ d.add('foo', 1)
+ assert d.popitem() == ('foo', 23)
+ with self.assert_raises(BadRequestKeyError):
+ d.popitem()
+ assert not d
+
+ d.add('foo', 23)
+ d.add('foo', 42)
+ d.add('foo', 1)
+ assert d.popitemlist() == ('foo', [23, 42, 1])
+
+ with self.assert_raises(BadRequestKeyError):
+ d.popitemlist()
+
+
+class CombinedMultiDictTestCase(WerkzeugTestCase):
+ storage_class = datastructures.CombinedMultiDict
+
+ def test_basic_interface(self):
+ d1 = datastructures.MultiDict([('foo', '1')])
+ d2 = datastructures.MultiDict([('bar', '2'), ('bar', '3')])
+ d = self.storage_class([d1, d2])
+
+ # lookup
+ assert d['foo'] == '1'
+ assert d['bar'] == '2'
+ assert d.getlist('bar') == ['2', '3']
+
+ assert sorted(d.items()) == [('bar', '2'), ('foo', '1')], d.items()
+ assert sorted(d.items(multi=True)) == [('bar', '2'), ('bar', '3'), ('foo', '1')]
+ assert 'missingkey' not in d
+ assert 'foo' in d
+
+ # type lookup
+ assert d.get('foo', type=int) == 1
+ assert d.getlist('bar', type=int) == [2, 3]
+
+ # get key errors for missing stuff
+ with self.assert_raises(KeyError):
+ d['missing']
+
+ # make sure that they are immutable
+ with self.assert_raises(TypeError):
+ d['foo'] = 'blub'
+
+ # copies are immutable
+ d = d.copy()
+ with self.assert_raises(TypeError):
+ d['foo'] = 'blub'
+
+ # make sure lists merges
+ md1 = datastructures.MultiDict((("foo", "bar"),))
+ md2 = datastructures.MultiDict((("foo", "blafasel"),))
+ x = self.storage_class((md1, md2))
+ assert x.lists() == [('foo', ['bar', 'blafasel'])]
+
+
+class HeadersTestCase(WerkzeugTestCase):
+ storage_class = datastructures.Headers
+
+ def test_basic_interface(self):
+ headers = self.storage_class()
+ headers.add('Content-Type', 'text/plain')
+ headers.add('X-Foo', 'bar')
+ assert 'x-Foo' in headers
+ assert 'Content-type' in headers
+
+ headers['Content-Type'] = 'foo/bar'
+ assert headers['Content-Type'] == 'foo/bar'
+ assert len(headers.getlist('Content-Type')) == 1
+
+ # list conversion
+ assert headers.to_list() == [
+ ('Content-Type', 'foo/bar'),
+ ('X-Foo', 'bar')
+ ]
+ assert str(headers) == (
+ "Content-Type: foo/bar\r\n"
+ "X-Foo: bar\r\n"
+ "\r\n")
+ assert str(self.storage_class()) == "\r\n"
+
+ # extended add
+ headers.add('Content-Disposition', 'attachment', filename='foo')
+ assert headers['Content-Disposition'] == 'attachment; filename=foo'
+
+ headers.add('x', 'y', z='"')
+ assert headers['x'] == r'y; z="\""'
+
+ def test_defaults_and_conversion(self):
+ # defaults
+ headers = self.storage_class([
+ ('Content-Type', 'text/plain'),
+ ('X-Foo', 'bar'),
+ ('X-Bar', '1'),
+ ('X-Bar', '2')
+ ])
+ assert headers.getlist('x-bar') == ['1', '2']
+ assert headers.get('x-Bar') == '1'
+ assert headers.get('Content-Type') == 'text/plain'
+
+ assert headers.setdefault('X-Foo', 'nope') == 'bar'
+ assert headers.setdefault('X-Bar', 'nope') == '1'
+ assert headers.setdefault('X-Baz', 'quux') == 'quux'
+ assert headers.setdefault('X-Baz', 'nope') == 'quux'
+ headers.pop('X-Baz')
+
+ # type conversion
+ assert headers.get('x-bar', type=int) == 1
+ assert headers.getlist('x-bar', type=int) == [1, 2]
+
+ # list like operations
+ assert headers[0] == ('Content-Type', 'text/plain')
+ assert headers[:1] == self.storage_class([('Content-Type', 'text/plain')])
+ del headers[:2]
+ del headers[-1]
+ assert headers == self.storage_class([('X-Bar', '1')])
+
+ def test_copying(self):
+ a = self.storage_class([('foo', 'bar')])
+ b = a.copy()
+ a.add('foo', 'baz')
+ assert a.getlist('foo') == ['bar', 'baz']
+ assert b.getlist('foo') == ['bar']
+
+ def test_popping(self):
+ headers = self.storage_class([('a', 1)])
+ assert headers.pop('a') == 1
+ assert headers.pop('b', 2) == 2
+
+ with self.assert_raises(KeyError):
+ headers.pop('c')
+
+ def test_set_arguments(self):
+ a = self.storage_class()
+ a.set('Content-Disposition', 'useless')
+ a.set('Content-Disposition', 'attachment', filename='foo')
+ assert a['Content-Disposition'] == 'attachment; filename=foo'
+
+ def test_reject_newlines(self):
+ h = self.storage_class()
+
+ for variation in 'foo\nbar', 'foo\r\nbar', 'foo\rbar':
+ with self.assert_raises(ValueError):
+ h['foo'] = variation
+ with self.assert_raises(ValueError):
+ h.add('foo', variation)
+ with self.assert_raises(ValueError):
+ h.add('foo', 'test', option=variation)
+ with self.assert_raises(ValueError):
+ h.set('foo', variation)
+ with self.assert_raises(ValueError):
+ h.set('foo', 'test', option=variation)
+
+
+class EnvironHeadersTestCase(WerkzeugTestCase):
+ storage_class = datastructures.EnvironHeaders
+
+ def test_basic_interface(self):
+ # this happens in multiple WSGI servers because they
+ # use a vary naive way to convert the headers;
+ broken_env = {
+ 'HTTP_CONTENT_TYPE': 'text/html',
+ 'CONTENT_TYPE': 'text/html',
+ 'HTTP_CONTENT_LENGTH': '0',
+ 'CONTENT_LENGTH': '0',
+ 'HTTP_ACCEPT': '*',
+ 'wsgi.version': (1, 0)
+ }
+ headers = self.storage_class(broken_env)
+ assert headers
+ assert len(headers) == 3
+ assert sorted(headers) == [
+ ('Accept', '*'),
+ ('Content-Length', '0'),
+ ('Content-Type', 'text/html')
+ ]
+ assert not self.storage_class({'wsgi.version': (1, 0)})
+ assert len(self.storage_class({'wsgi.version': (1, 0)})) == 0
+
+
+class HeaderSetTestCase(WerkzeugTestCase):
+ storage_class = datastructures.HeaderSet
+
+ def test_basic_interface(self):
+ hs = self.storage_class()
+ hs.add('foo')
+ hs.add('bar')
+ assert 'Bar' in hs
+ assert hs.find('foo') == 0
+ assert hs.find('BAR') == 1
+ assert hs.find('baz') < 0
+ hs.discard('missing')
+ hs.discard('foo')
+ assert hs.find('foo') < 0
+ assert hs.find('bar') == 0
+
+ with self.assert_raises(IndexError):
+ hs.index('missing')
+
+ assert hs.index('bar') == 0
+ assert hs
+ hs.clear()
+ assert not hs
+
+
+class ImmutableListTestCase(WerkzeugTestCase):
+ storage_class = datastructures.ImmutableList
+
+ def test_list_hashable(self):
+ t = (1, 2, 3, 4)
+ l = self.storage_class(t)
+ self.assert_equal(hash(t), hash(l))
+ self.assert_not_equal(t, l)
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MultiDictTestCase))
+ suite.addTest(unittest.makeSuite(OrderedMultiDictTestCase))
+ suite.addTest(unittest.makeSuite(CombinedMultiDictTestCase))
+ suite.addTest(unittest.makeSuite(ImmutableTypeConversionDictTestCase))
+ suite.addTest(unittest.makeSuite(ImmutableMultiDictTestCase))
+ suite.addTest(unittest.makeSuite(ImmutableDictTestCase))
+ suite.addTest(unittest.makeSuite(ImmutableOrderedMultiDictTestCase))
+ suite.addTest(unittest.makeSuite(HeadersTestCase))
+ suite.addTest(unittest.makeSuite(EnvironHeadersTestCase))
+ suite.addTest(unittest.makeSuite(HeaderSetTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/debug.py b/websdk/werkzeug/testsuite/debug.py
new file mode 100644
index 0000000..ffe9a2e
--- /dev/null
+++ b/websdk/werkzeug/testsuite/debug.py
@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.debug
+ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests some debug utilities.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+import sys
+import re
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.debug.repr import debug_repr, DebugReprGenerator, \
+ dump, helper
+from werkzeug.debug.console import HTMLStringO
+
+
+class DebugReprTestCase(WerkzeugTestCase):
+
+ def test_basic_repr(self):
+ assert debug_repr([]) == u'[]'
+ assert debug_repr([1, 2]) == \
+ u'[<span class="number">1</span>, <span class="number">2</span>]'
+ assert debug_repr([1, 'test']) == \
+ u'[<span class="number">1</span>, <span class="string">\'test\'</span>]'
+ assert debug_repr([None]) == \
+ u'[<span class="object">None</span>]'
+
+ def test_sequence_repr(self):
+ assert debug_repr(list(range(20))) == (
+ u'[<span class="number">0</span>, <span class="number">1</span>, '
+ u'<span class="number">2</span>, <span class="number">3</span>, '
+ u'<span class="number">4</span>, <span class="number">5</span>, '
+ u'<span class="number">6</span>, <span class="number">7</span>, '
+ u'<span class="extended"><span class="number">8</span>, '
+ u'<span class="number">9</span>, <span class="number">10</span>, '
+ u'<span class="number">11</span>, <span class="number">12</span>, '
+ u'<span class="number">13</span>, <span class="number">14</span>, '
+ u'<span class="number">15</span>, <span class="number">16</span>, '
+ u'<span class="number">17</span>, <span class="number">18</span>, '
+ u'<span class="number">19</span></span>]'
+ )
+
+ def test_mapping_repr(self):
+ assert debug_repr({}) == u'{}'
+ assert debug_repr({'foo': 42}) == \
+ u'{<span class="pair"><span class="key"><span class="string">\'foo\''\
+ u'</span></span>: <span class="value"><span class="number">42' \
+ u'</span></span></span>}'
+ assert debug_repr(dict(zip(range(10), [None] * 10))) == \
+ u'{<span class="pair"><span class="key"><span class="number">0</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">1</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">2</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">3</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="extended"><span class="pair"><span class="key"><span class="number">4</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">5</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">6</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">7</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">8</span></span>: <span class="value"><span class="object">None</span></span></span>, <span class="pair"><span class="key"><span class="number">9</span></span>: <span class="value"><span class="object">None</span></span></span></span>}'
+ assert debug_repr((1, 'zwei', u'drei')) ==\
+ u'(<span class="number">1</span>, <span class="string">\'' \
+ u'zwei\'</span>, <span class="string">u\'drei\'</span>)'
+
+ def test_custom_repr(self):
+ class Foo(object):
+ def __repr__(self):
+ return '<Foo 42>'
+ assert debug_repr(Foo()) == '<span class="object">&lt;Foo 42&gt;</span>'
+
+ def test_list_subclass_repr(self):
+ class MyList(list):
+ pass
+ assert debug_repr(MyList([1, 2])) == \
+ u'<span class="module">werkzeug.testsuite.debug.</span>MyList([' \
+ u'<span class="number">1</span>, <span class="number">2</span>])'
+
+ def test_regex_repr(self):
+ assert debug_repr(re.compile(r'foo\d')) == \
+ u're.compile(<span class="string regex">r\'foo\\d\'</span>)'
+ assert debug_repr(re.compile(ur'foo\d')) == \
+ u're.compile(<span class="string regex">ur\'foo\\d\'</span>)'
+
+ def test_set_repr(self):
+ assert debug_repr(frozenset('x')) == \
+ u'frozenset([<span class="string">\'x\'</span>])'
+ assert debug_repr(set('x')) == \
+ u'set([<span class="string">\'x\'</span>])'
+
+ def test_recursive_repr(self):
+ a = [1]
+ a.append(a)
+ assert debug_repr(a) == u'[<span class="number">1</span>, [...]]'
+
+ def test_broken_repr(self):
+ class Foo(object):
+ def __repr__(self):
+ 1/0
+
+ assert debug_repr(Foo()) == \
+ u'<span class="brokenrepr">&lt;broken repr (ZeroDivisionError: ' \
+ u'integer division or modulo by zero)&gt;</span>'
+
+
+class DebugHelpersTestCase(WerkzeugTestCase):
+
+ def test_object_dumping(self):
+ class Foo(object):
+ x = 42
+ y = 23
+ def __init__(self):
+ self.z = 15
+
+ drg = DebugReprGenerator()
+ out = drg.dump_object(Foo())
+ assert re.search('Details for werkzeug.testsuite.debug.Foo object at', out)
+ assert re.search('<th>x.*<span class="number">42</span>(?s)', out)
+ assert re.search('<th>y.*<span class="number">23</span>(?s)', out)
+ assert re.search('<th>z.*<span class="number">15</span>(?s)', out)
+
+ out = drg.dump_object({'x': 42, 'y': 23})
+ assert re.search('Contents of', out)
+ assert re.search('<th>x.*<span class="number">42</span>(?s)', out)
+ assert re.search('<th>y.*<span class="number">23</span>(?s)', out)
+
+ out = drg.dump_object({'x': 42, 'y': 23, 23: 11})
+ assert not re.search('Contents of', out)
+
+ out = drg.dump_locals({'x': 42, 'y': 23})
+ assert re.search('Local variables in frame', out)
+ assert re.search('<th>x.*<span class="number">42</span>(?s)', out)
+ assert re.search('<th>y.*<span class="number">23</span>(?s)', out)
+
+ def test_debug_dump(self):
+ old = sys.stdout
+ sys.stdout = HTMLStringO()
+ try:
+ dump([1, 2, 3])
+ x = sys.stdout.reset()
+ dump()
+ y = sys.stdout.reset()
+ finally:
+ sys.stdout = old
+
+ assert 'Details for list object at' in x
+ assert '<span class="number">1</span>' in x
+ assert 'Local variables in frame' in y
+ assert '<th>x' in y
+ assert '<th>old' in y
+
+ def test_debug_help(self):
+ old = sys.stdout
+ sys.stdout = HTMLStringO()
+ try:
+ helper([1, 2, 3])
+ x = sys.stdout.reset()
+ finally:
+ sys.stdout = old
+
+ assert 'Help on list object' in x
+ assert '__delitem__' in x
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(DebugReprTestCase))
+ suite.addTest(unittest.makeSuite(DebugHelpersTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/exceptions.py b/websdk/werkzeug/testsuite/exceptions.py
new file mode 100644
index 0000000..8530b9c
--- /dev/null
+++ b/websdk/werkzeug/testsuite/exceptions.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.exceptions
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ The tests for the exception classes.
+
+ TODO:
+
+ - This is undertested. HTML is never checked
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import exceptions
+from werkzeug.wrappers import Response
+
+
+class ExceptionsTestCase(WerkzeugTestCase):
+
+ def test_proxy_exception(self):
+ """Proxy exceptions"""
+ orig_resp = Response('Hello World')
+ try:
+ exceptions.abort(orig_resp)
+ except exceptions.HTTPException, e:
+ resp = e.get_response({})
+ else:
+ self.fail('exception not raised')
+ self.assert_(resp is orig_resp)
+ self.assert_equal(resp.data, 'Hello World')
+
+ def test_aborter(self):
+ """Exception aborter"""
+ abort = exceptions.abort
+ self.assert_raises(exceptions.BadRequest, abort, 400)
+ self.assert_raises(exceptions.Unauthorized, abort, 401)
+ self.assert_raises(exceptions.Forbidden, abort, 403)
+ self.assert_raises(exceptions.NotFound, abort, 404)
+ self.assert_raises(exceptions.MethodNotAllowed, abort, 405, ['GET', 'HEAD'])
+ self.assert_raises(exceptions.NotAcceptable, abort, 406)
+ self.assert_raises(exceptions.RequestTimeout, abort, 408)
+ self.assert_raises(exceptions.Gone, abort, 410)
+ self.assert_raises(exceptions.LengthRequired, abort, 411)
+ self.assert_raises(exceptions.PreconditionFailed, abort, 412)
+ self.assert_raises(exceptions.RequestEntityTooLarge, abort, 413)
+ self.assert_raises(exceptions.RequestURITooLarge, abort, 414)
+ self.assert_raises(exceptions.UnsupportedMediaType, abort, 415)
+ self.assert_raises(exceptions.InternalServerError, abort, 500)
+ self.assert_raises(exceptions.NotImplemented, abort, 501)
+ self.assert_raises(exceptions.BadGateway, abort, 502)
+ self.assert_raises(exceptions.ServiceUnavailable, abort, 503)
+
+ myabort = exceptions.Aborter({1: exceptions.NotFound})
+ self.assert_raises(LookupError, myabort, 404)
+ self.assert_raises(exceptions.NotFound, myabort, 1)
+
+ myabort = exceptions.Aborter(extra={1: exceptions.NotFound})
+ self.assert_raises(exceptions.NotFound, myabort, 404)
+ self.assert_raises(exceptions.NotFound, myabort, 1)
+
+ def test_exception_repr(self):
+ exc = exceptions.NotFound()
+ self.assert_equal(unicode(exc), '404: Not Found')
+ self.assert_equal(repr(exc), "<NotFound '404: Not Found'>")
+
+ exc = exceptions.NotFound('Not There')
+ self.assert_equal(unicode(exc), '404: Not There')
+ self.assert_equal(repr(exc), "<NotFound '404: Not There'>")
+
+ def test_special_exceptions(self):
+ exc = exceptions.MethodNotAllowed(['GET', 'HEAD', 'POST'])
+ h = dict(exc.get_headers({}))
+ self.assert_equal(h['Allow'], 'GET, HEAD, POST')
+ self.assert_('The method DELETE is not allowed' in exc.get_description({
+ 'REQUEST_METHOD': 'DELETE'
+ }))
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ExceptionsTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/formparser.py b/websdk/werkzeug/testsuite/formparser.py
new file mode 100644
index 0000000..7dd5b79
--- /dev/null
+++ b/websdk/werkzeug/testsuite/formparser.py
@@ -0,0 +1,370 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.formparser
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the form parsing facilities.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+from StringIO import StringIO
+from os.path import join, dirname
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import formparser
+from werkzeug.test import create_environ, Client
+from werkzeug.wrappers import Request, Response
+from werkzeug.exceptions import RequestEntityTooLarge
+
+
+@Request.application
+def form_data_consumer(request):
+ result_object = request.args['object']
+ if result_object == 'text':
+ return Response(repr(request.form['text']))
+ f = request.files[result_object]
+ return Response('\n'.join((
+ repr(f.filename),
+ repr(f.name),
+ repr(f.content_type),
+ f.stream.read()
+ )))
+
+
+def get_contents(filename):
+ f = file(filename, 'rb')
+ try:
+ return f.read()
+ finally:
+ f.close()
+
+
+class FormParserTestCase(WerkzeugTestCase):
+
+ def test_limiting(self):
+ """Test the limiting features"""
+ data = 'foo=Hello+World&bar=baz'
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='application/x-www-form-urlencoded',
+ method='POST')
+ req.max_content_length = 400
+ self.assert_equal(req.form['foo'], 'Hello World')
+
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='application/x-www-form-urlencoded',
+ method='POST')
+ req.max_form_memory_size = 7
+ self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
+
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='application/x-www-form-urlencoded',
+ method='POST')
+ req.max_form_memory_size = 400
+ self.assert_equal(req.form['foo'], 'Hello World')
+
+ data = ('--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n'
+ 'Hello World\r\n'
+ '--foo\r\nContent-Disposition: form-field; name=bar\r\n\r\n'
+ 'bar=baz\r\n--foo--')
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ req.max_content_length = 4
+ self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
+
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ req.max_content_length = 400
+ self.assert_equal(req.form['foo'], 'Hello World')
+
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ req.max_form_memory_size = 7
+ self.assert_raises(RequestEntityTooLarge, lambda: req.form['foo'])
+
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ req.max_form_memory_size = 400
+ self.assert_equal(req.form['foo'], 'Hello World')
+
+ def test_parse_form_data_put_without_content(self):
+ """A PUT without a Content-Type header returns empty data
+
+ Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message
+ containing an entity-body SHOULD include a Content-Type header field
+ defining the media type of that body." In the case where either
+ headers are omitted, parse_form_data should still work.
+ """
+ env = create_environ('/foo', 'http://example.org/', method='PUT')
+ del env['CONTENT_TYPE']
+ del env['CONTENT_LENGTH']
+
+ stream, form, files = formparser.parse_form_data(env)
+ self.assert_equal(stream.read(), '')
+ self.assert_equal(len(form), 0)
+ self.assert_equal(len(files), 0)
+
+ def test_parse_form_data_get_without_content(self):
+ """GET requests without data, content type and length returns no data"""
+ env = create_environ('/foo', 'http://example.org/', method='GET')
+ del env['CONTENT_TYPE']
+ del env['CONTENT_LENGTH']
+
+ stream, form, files = formparser.parse_form_data(env)
+ self.assert_equal(stream.read(), '')
+ self.assert_equal(len(form), 0)
+ self.assert_equal(len(files), 0)
+
+ def test_large_file(self):
+ """Test a largish file."""
+ data = 'x' * (1024 * 600)
+ req = Request.from_values(data={'foo': (StringIO(data), 'test.txt')},
+ method='POST')
+ # make sure we have a real file here, because we expect to be
+ # on the disk. > 1024 * 500
+ self.assert_(isinstance(req.files['foo'].stream, file))
+
+
+class MultiPartTestCase(WerkzeugTestCase):
+
+ def test_basic(self):
+ """Tests multipart parsing against data collected from webbrowsers"""
+ resources = join(dirname(__file__), 'multipart')
+ client = Client(form_data_consumer, Response)
+
+ repository = [
+ ('firefox3-2png1txt', '---------------------------186454651713519341951581030105', [
+ (u'anchor.png', 'file1', 'image/png', 'file1.png'),
+ (u'application_edit.png', 'file2', 'image/png', 'file2.png')
+ ], u'example text'),
+ ('firefox3-2pnglongtext', '---------------------------14904044739787191031754711748', [
+ (u'accept.png', 'file1', 'image/png', 'file1.png'),
+ (u'add.png', 'file2', 'image/png', 'file2.png')
+ ], u'--long text\r\n--with boundary\r\n--lookalikes--'),
+ ('opera8-2png1txt', '----------zEO9jQKmLc2Cq88c23Dx19', [
+ (u'arrow_branch.png', 'file1', 'image/png', 'file1.png'),
+ (u'award_star_bronze_1.png', 'file2', 'image/png', 'file2.png')
+ ], u'blafasel öäü'),
+ ('webkit3-2png1txt', '----WebKitFormBoundaryjdSFhcARk8fyGNy6', [
+ (u'gtk-apply.png', 'file1', 'image/png', 'file1.png'),
+ (u'gtk-no.png', 'file2', 'image/png', 'file2.png')
+ ], u'this is another text with ümläüts'),
+ ('ie6-2png1txt', '---------------------------7d91b03a20128', [
+ (u'file1.png', 'file1', 'image/x-png', 'file1.png'),
+ (u'file2.png', 'file2', 'image/x-png', 'file2.png')
+ ], u'ie6 sucks :-/')
+ ]
+
+ for name, boundary, files, text in repository:
+ folder = join(resources, name)
+ data = get_contents(join(folder, 'request.txt'))
+ for filename, field, content_type, fsname in files:
+ response = client.post('/?object=' + field, data=data, content_type=
+ 'multipart/form-data; boundary="%s"' % boundary,
+ content_length=len(data))
+ lines = response.data.split('\n', 3)
+ self.assert_equal(lines[0], repr(filename))
+ self.assert_equal(lines[1], repr(field))
+ self.assert_equal(lines[2], repr(content_type))
+ self.assert_equal(lines[3], get_contents(join(folder, fsname)))
+ response = client.post('/?object=text', data=data, content_type=
+ 'multipart/form-data; boundary="%s"' % boundary,
+ content_length=len(data))
+ self.assert_equal(response.data, repr(text))
+
+ def test_ie7_unc_path(self):
+ client = Client(form_data_consumer, Response)
+ data_file = join(dirname(__file__), 'multipart', 'ie7_full_path_request.txt')
+ data = get_contents(data_file)
+ boundary = '---------------------------7da36d1b4a0164'
+ response = client.post('/?object=cb_file_upload_multiple', data=data, content_type=
+ 'multipart/form-data; boundary="%s"' % boundary, content_length=len(data))
+ lines = response.data.split('\n', 3)
+ self.assert_equal(lines[0],
+ repr(u'Sellersburg Town Council Meeting 02-22-2010doc.doc'))
+
+ def test_end_of_file(self):
+ """Test for multipart files ending unexpectedly"""
+ # This test looks innocent but it was actually timeing out in
+ # the Werkzeug 0.5 release version (#394)
+ data = (
+ '--foo\r\n'
+ 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
+ 'Content-Type: text/plain\r\n\r\n'
+ 'file contents and no end'
+ )
+ data = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ self.assert_(not data.files)
+ self.assert_(not data.form)
+
+ def test_broken(self):
+ """Broken multipart does not break the applicaiton"""
+ data = (
+ '--foo\r\n'
+ 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
+ 'Content-Transfer-Encoding: base64\r\n'
+ 'Content-Type: text/plain\r\n\r\n'
+ 'broken base 64'
+ '--foo--'
+ )
+ _, form, files = formparser.parse_form_data(create_environ(data=data,
+ method='POST', content_type='multipart/form-data; boundary=foo'))
+ self.assert_(not files)
+ self.assert_(not form)
+
+ self.assert_raises(ValueError, formparser.parse_form_data,
+ create_environ(data=data, method='POST',
+ content_type='multipart/form-data; boundary=foo'),
+ silent=False)
+
+ def test_file_no_content_type(self):
+ """Chrome does not always provide a content type."""
+ data = (
+ '--foo\r\n'
+ 'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n\r\n'
+ 'file contents\r\n--foo--'
+ )
+ data = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ self.assert_equal(data.files['test'].filename, 'test.txt')
+ self.assert_equal(data.files['test'].read(), 'file contents')
+
+ def test_extra_newline(self):
+ """Test for multipart uploads with extra newlines"""
+ # this test looks innocent but it was actually timeing out in
+ # the Werkzeug 0.5 release version (#394)
+ data = (
+ '\r\n\r\n--foo\r\n'
+ 'Content-Disposition: form-data; name="foo"\r\n\r\n'
+ 'a string\r\n'
+ '--foo--'
+ )
+ data = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ self.assert_(not data.files)
+ self.assert_equal(data.form['foo'], 'a string')
+
+ def test_headers(self):
+ """Test access to multipart headers"""
+ data = ('--foo\r\n'
+ 'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
+ 'X-Custom-Header: blah\r\n'
+ 'Content-Type: text/plain; charset=utf-8\r\n\r\n'
+ 'file contents, just the contents\r\n'
+ '--foo--')
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ foo = req.files['foo']
+ self.assert_equal(foo.mimetype, 'text/plain')
+ self.assert_equal(foo.mimetype_params, {'charset': 'utf-8'})
+ self.assert_equal(foo.headers['content-type'], foo.content_type)
+ self.assert_equal(foo.content_type, 'text/plain; charset=utf-8')
+ self.assert_equal(foo.headers['x-custom-header'], 'blah')
+
+ def test_nonstandard_line_endings(self):
+ """Test nonstandard line endings of multipart form data"""
+ for nl in '\n', '\r', '\r\n':
+ data = nl.join((
+ '--foo',
+ 'Content-Disposition: form-data; name=foo',
+ '',
+ 'this is just bar',
+ '--foo',
+ 'Content-Disposition: form-data; name=bar',
+ '',
+ 'blafasel',
+ '--foo--'
+ ))
+ req = Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; '
+ 'boundary=foo', method='POST')
+ self.assert_equal(req.form['foo'], 'this is just bar')
+ self.assert_equal(req.form['bar'], 'blafasel')
+
+ def test_failures(self):
+ def parse_multipart(stream, boundary, content_length):
+ parser = formparser.MultiPartParser(content_length)
+ return parser.parse(stream, boundary, content_length)
+ self.assert_raises(ValueError, parse_multipart, StringIO(''), '', 0)
+ self.assert_raises(ValueError, parse_multipart, StringIO(''), 'broken ', 0)
+
+ data = '--foo\r\n\r\nHello World\r\n--foo--'
+ self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data))
+
+ data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n' \
+ 'Content-Transfer-Encoding: base64\r\n\r\nHello World\r\n--foo--'
+ self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data))
+
+ data = '--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\nHello World\r\n'
+ self.assert_raises(ValueError, parse_multipart, StringIO(data), 'foo', len(data))
+
+ x = formparser.parse_multipart_headers(['foo: bar\r\n', ' x test\r\n'])
+ self.assert_equal(x['foo'], 'bar\n x test')
+ self.assert_raises(ValueError, formparser.parse_multipart_headers,
+ ['foo: bar\r\n', ' x test'])
+
+ def test_bad_newline_bad_newline_assumption(self):
+ """Make sure we don't eat up stuff that is not a newline"""
+ class ISORequest(Request):
+ charset = 'latin1'
+ contents = 'U2vlbmUgbORu'
+ data = '--foo\r\nContent-Disposition: form-data; name="test"\r\n' \
+ 'Content-Transfer-Encoding: base64\r\n\r\n' + \
+ contents + '\r\n--foo--'
+ req = ISORequest.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ self.assert_equal(req.form['test'], u'Sk\xe5ne l\xe4n')
+
+
+class InternalFunctionsTestCase(WerkzeugTestCase):
+
+ def test_lien_parser(self):
+ assert formparser._line_parse('foo') == ('foo', False)
+ assert formparser._line_parse('foo\r\n') == ('foo', True)
+ assert formparser._line_parse('foo\r') == ('foo', True)
+ assert formparser._line_parse('foo\n') == ('foo', True)
+
+ def test_find_terminator(self):
+ lineiter = iter('\n\n\nfoo\nbar\nbaz'.splitlines(True))
+ find_terminator = formparser.MultiPartParser()._find_terminator
+ line = find_terminator(lineiter)
+ assert line == 'foo'
+ assert list(lineiter) == ['bar\n', 'baz']
+ assert find_terminator([]) == ''
+ assert find_terminator(['']) == ''
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(FormParserTestCase))
+ suite.addTest(unittest.makeSuite(MultiPartTestCase))
+ suite.addTest(unittest.makeSuite(InternalFunctionsTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/http.py b/websdk/werkzeug/testsuite/http.py
new file mode 100644
index 0000000..733aee2
--- /dev/null
+++ b/websdk/werkzeug/testsuite/http.py
@@ -0,0 +1,392 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.http
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ HTTP parsing utilities.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+from datetime import datetime
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import http, datastructures
+from werkzeug.test import create_environ
+
+
+class HTTPUtilityTestCase(WerkzeugTestCase):
+
+ def test_accept(self):
+ a = http.parse_accept_header('en-us,ru;q=0.5')
+ self.assert_equal(a.values(), ['en-us', 'ru'])
+ self.assert_equal(a.best, 'en-us')
+ self.assert_equal(a.find('ru'), 1)
+ self.assert_raises(ValueError, a.index, 'de')
+ self.assert_equal(a.to_header(), 'en-us,ru;q=0.5')
+
+ def test_mime_accept(self):
+ a = http.parse_accept_header('text/xml,application/xml,'
+ 'application/xhtml+xml,'
+ 'text/html;q=0.9,text/plain;q=0.8,'
+ 'image/png,*/*;q=0.5',
+ datastructures.MIMEAccept)
+ self.assert_raises(ValueError, lambda: a['missing'])
+ self.assert_equal(a['image/png'], 1)
+ self.assert_equal(a['text/plain'], 0.8)
+ self.assert_equal(a['foo/bar'], 0.5)
+ self.assert_equal(a[a.find('foo/bar')], ('*/*', 0.5))
+
+ def test_accept_matches(self):
+ a = http.parse_accept_header('text/xml,application/xml,application/xhtml+xml,'
+ 'text/html;q=0.9,text/plain;q=0.8,'
+ 'image/png', datastructures.MIMEAccept)
+ self.assert_equal(a.best_match(['text/html', 'application/xhtml+xml']),
+ 'application/xhtml+xml')
+ self.assert_equal(a.best_match(['text/html']), 'text/html')
+ self.assert_(a.best_match(['foo/bar']) is None)
+ self.assert_equal(a.best_match(['foo/bar', 'bar/foo'],
+ default='foo/bar'), 'foo/bar')
+ self.assert_equal(a.best_match(['application/xml', 'text/xml']), 'application/xml')
+
+ def test_charset_accept(self):
+ a = http.parse_accept_header('ISO-8859-1,utf-8;q=0.7,*;q=0.7',
+ datastructures.CharsetAccept)
+ self.assert_equal(a['iso-8859-1'], a['iso8859-1'])
+ self.assert_equal(a['iso-8859-1'], 1)
+ self.assert_equal(a['UTF8'], 0.7)
+ self.assert_equal(a['ebcdic'], 0.7)
+
+ def test_language_accept(self):
+ a = http.parse_accept_header('de-AT,de;q=0.8,en;q=0.5',
+ datastructures.LanguageAccept)
+ self.assert_equal(a.best, 'de-AT')
+ self.assert_('de_AT' in a)
+ self.assert_('en' in a)
+ self.assert_equal(a['de-at'], 1)
+ self.assert_equal(a['en'], 0.5)
+
+ def test_set_header(self):
+ hs = http.parse_set_header('foo, Bar, "Blah baz", Hehe')
+ self.assert_('blah baz' in hs)
+ self.assert_('foobar' not in hs)
+ self.assert_('foo' in hs)
+ self.assert_equal(list(hs), ['foo', 'Bar', 'Blah baz', 'Hehe'])
+ hs.add('Foo')
+ self.assert_equal(hs.to_header(), 'foo, Bar, "Blah baz", Hehe')
+
+ def test_list_header(self):
+ hl = http.parse_list_header('foo baz, blah')
+ self.assert_equal(hl, ['foo baz', 'blah'])
+
+ def test_dict_header(self):
+ d = http.parse_dict_header('foo="bar baz", blah=42')
+ self.assert_equal(d, {'foo': 'bar baz', 'blah': '42'})
+
+ def test_cache_control_header(self):
+ cc = http.parse_cache_control_header('max-age=0, no-cache')
+ assert cc.max_age == 0
+ assert cc.no_cache
+ cc = http.parse_cache_control_header('private, community="UCI"', None,
+ datastructures.ResponseCacheControl)
+ assert cc.private
+ assert cc['community'] == 'UCI'
+
+ c = datastructures.ResponseCacheControl()
+ assert c.no_cache is None
+ assert c.private is None
+ c.no_cache = True
+ assert c.no_cache == '*'
+ c.private = True
+ assert c.private == '*'
+ del c.private
+ assert c.private is None
+ assert c.to_header() == 'no-cache'
+
+ def test_authorization_header(self):
+ a = http.parse_authorization_header('Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==')
+ assert a.type == 'basic'
+ assert a.username == 'Aladdin'
+ assert a.password == 'open sesame'
+
+ a = http.parse_authorization_header('''Digest username="Mufasa",
+ realm="testrealm@host.invalid",
+ nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093",
+ uri="/dir/index.html",
+ qop=auth,
+ nc=00000001,
+ cnonce="0a4f113b",
+ response="6629fae49393a05397450978507c4ef1",
+ opaque="5ccc069c403ebaf9f0171e9517f40e41"''')
+ assert a.type == 'digest'
+ assert a.username == 'Mufasa'
+ assert a.realm == 'testrealm@host.invalid'
+ assert a.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093'
+ assert a.uri == '/dir/index.html'
+ assert 'auth' in a.qop
+ assert a.nc == '00000001'
+ assert a.cnonce == '0a4f113b'
+ assert a.response == '6629fae49393a05397450978507c4ef1'
+ assert a.opaque == '5ccc069c403ebaf9f0171e9517f40e41'
+
+ a = http.parse_authorization_header('''Digest username="Mufasa",
+ realm="testrealm@host.invalid",
+ nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093",
+ uri="/dir/index.html",
+ response="e257afa1414a3340d93d30955171dd0e",
+ opaque="5ccc069c403ebaf9f0171e9517f40e41"''')
+ assert a.type == 'digest'
+ assert a.username == 'Mufasa'
+ assert a.realm == 'testrealm@host.invalid'
+ assert a.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093'
+ assert a.uri == '/dir/index.html'
+ assert a.response == 'e257afa1414a3340d93d30955171dd0e'
+ assert a.opaque == '5ccc069c403ebaf9f0171e9517f40e41'
+
+ assert http.parse_authorization_header('') is None
+ assert http.parse_authorization_header(None) is None
+ assert http.parse_authorization_header('foo') is None
+
+ def test_www_authenticate_header(self):
+ wa = http.parse_www_authenticate_header('Basic realm="WallyWorld"')
+ assert wa.type == 'basic'
+ assert wa.realm == 'WallyWorld'
+ wa.realm = 'Foo Bar'
+ assert wa.to_header() == 'Basic realm="Foo Bar"'
+
+ wa = http.parse_www_authenticate_header('''Digest
+ realm="testrealm@host.com",
+ qop="auth,auth-int",
+ nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093",
+ opaque="5ccc069c403ebaf9f0171e9517f40e41"''')
+ assert wa.type == 'digest'
+ assert wa.realm == 'testrealm@host.com'
+ assert 'auth' in wa.qop
+ assert 'auth-int' in wa.qop
+ assert wa.nonce == 'dcd98b7102dd2f0e8b11d0f600bfb0c093'
+ assert wa.opaque == '5ccc069c403ebaf9f0171e9517f40e41'
+
+ wa = http.parse_www_authenticate_header('broken')
+ assert wa.type == 'broken'
+
+ assert not http.parse_www_authenticate_header('').type
+ assert not http.parse_www_authenticate_header('')
+
+ def test_etags(self):
+ assert http.quote_etag('foo') == '"foo"'
+ assert http.quote_etag('foo', True) == 'w/"foo"'
+ assert http.unquote_etag('"foo"') == ('foo', False)
+ assert http.unquote_etag('w/"foo"') == ('foo', True)
+ es = http.parse_etags('"foo", "bar", w/"baz", blar')
+ assert sorted(es) == ['bar', 'blar', 'foo']
+ assert 'foo' in es
+ assert 'baz' not in es
+ assert es.contains_weak('baz')
+ assert 'blar' in es
+ assert es.contains_raw('w/"baz"')
+ assert es.contains_raw('"foo"')
+ assert sorted(es.to_header().split(', ')) == ['"bar"', '"blar"', '"foo"', 'w/"baz"']
+
+ def test_parse_date(self):
+ assert http.parse_date('Sun, 06 Nov 1994 08:49:37 GMT ') == datetime(1994, 11, 6, 8, 49, 37)
+ assert http.parse_date('Sunday, 06-Nov-94 08:49:37 GMT') == datetime(1994, 11, 6, 8, 49, 37)
+ assert http.parse_date(' Sun Nov 6 08:49:37 1994') == datetime(1994, 11, 6, 8, 49, 37)
+ assert http.parse_date('foo') is None
+
+ def test_parse_date_overflows(self):
+ assert http.parse_date(' Sun 02 Feb 1343 08:49:37 GMT') == datetime(1343, 2, 2, 8, 49, 37)
+ assert http.parse_date('Thu, 01 Jan 1970 00:00:00 GMT') == datetime(1970, 1, 1, 0, 0)
+ assert http.parse_date('Thu, 33 Jan 1970 00:00:00 GMT') is None
+
+ def test_remove_entity_headers(self):
+ now = http.http_date()
+ headers1 = [('Date', now), ('Content-Type', 'text/html'), ('Content-Length', '0')]
+ headers2 = datastructures.Headers(headers1)
+
+ http.remove_entity_headers(headers1)
+ assert headers1 == [('Date', now)]
+
+ http.remove_entity_headers(headers2)
+ assert headers2 == datastructures.Headers([('Date', now)])
+
+ def test_remove_hop_by_hop_headers(self):
+ headers1 = [('Connection', 'closed'), ('Foo', 'bar'),
+ ('Keep-Alive', 'wtf')]
+ headers2 = datastructures.Headers(headers1)
+
+ http.remove_hop_by_hop_headers(headers1)
+ assert headers1 == [('Foo', 'bar')]
+
+ http.remove_hop_by_hop_headers(headers2)
+ assert headers2 == datastructures.Headers([('Foo', 'bar')])
+
+ def test_parse_options_header(self):
+ assert http.parse_options_header('something; foo="other\"thing"') == \
+ ('something', {'foo': 'other"thing'})
+ assert http.parse_options_header('something; foo="other\"thing"; meh=42') == \
+ ('something', {'foo': 'other"thing', 'meh': '42'})
+ assert http.parse_options_header('something; foo="other\"thing"; meh=42; bleh') == \
+ ('something', {'foo': 'other"thing', 'meh': '42', 'bleh': None})
+
+ def test_dump_options_header(self):
+ assert http.dump_options_header('foo', {'bar': 42}) == \
+ 'foo; bar=42'
+ assert http.dump_options_header('foo', {'bar': 42, 'fizz': None}) == \
+ 'foo; bar=42; fizz'
+
+ def test_dump_header(self):
+ assert http.dump_header([1, 2, 3]) == '1, 2, 3'
+ assert http.dump_header([1, 2, 3], allow_token=False) == '"1", "2", "3"'
+ assert http.dump_header({'foo': 'bar'}, allow_token=False) == 'foo="bar"'
+ assert http.dump_header({'foo': 'bar'}) == 'foo=bar'
+
+ def test_is_resource_modified(self):
+ env = create_environ()
+
+ # ignore POST
+ env['REQUEST_METHOD'] = 'POST'
+ assert not http.is_resource_modified(env, etag='testing')
+ env['REQUEST_METHOD'] = 'GET'
+
+ # etagify from data
+ self.assert_raises(TypeError, http.is_resource_modified, env,
+ data='42', etag='23')
+ env['HTTP_IF_NONE_MATCH'] = http.generate_etag('awesome')
+ assert not http.is_resource_modified(env, data='awesome')
+
+ env['HTTP_IF_MODIFIED_SINCE'] = http.http_date(datetime(2008, 1, 1, 12, 30))
+ assert not http.is_resource_modified(env,
+ last_modified=datetime(2008, 1, 1, 12, 00))
+ assert http.is_resource_modified(env,
+ last_modified=datetime(2008, 1, 1, 13, 00))
+
+ def test_date_formatting(self):
+ assert http.cookie_date(0) == 'Thu, 01-Jan-1970 00:00:00 GMT'
+ assert http.cookie_date(datetime(1970, 1, 1)) == 'Thu, 01-Jan-1970 00:00:00 GMT'
+ assert http.http_date(0) == 'Thu, 01 Jan 1970 00:00:00 GMT'
+ assert http.http_date(datetime(1970, 1, 1)) == 'Thu, 01 Jan 1970 00:00:00 GMT'
+
+ def test_cookies(self):
+ assert http.parse_cookie('dismiss-top=6; CP=null*; PHPSESSID=0a539d42abc001cd'
+ 'c762809248d4beed; a=42') == {
+ 'CP': u'null*',
+ 'PHPSESSID': u'0a539d42abc001cdc762809248d4beed',
+ 'a': u'42',
+ 'dismiss-top': u'6'
+ }
+ assert set(http.dump_cookie('foo', 'bar baz blub', 360, httponly=True,
+ sync_expires=False).split('; ')) == \
+ set(['HttpOnly', 'Max-Age=360', 'Path=/', 'foo="bar baz blub"'])
+ assert http.parse_cookie('fo234{=bar blub=Blah') == {'blub': 'Blah'}
+
+ def test_cookie_quoting(self):
+ val = http.dump_cookie("foo", "?foo")
+ assert val == 'foo="?foo"; Path=/'
+ assert http.parse_cookie(val) == {'foo': '?foo'}
+
+ assert http.parse_cookie(r'foo="foo\054bar"') == {'foo': 'foo,bar'}
+
+
+class RangeTestCase(WerkzeugTestCase):
+
+ def test_if_range_parsing(self):
+ rv = http.parse_if_range_header('"Test"')
+ assert rv.etag == 'Test'
+ assert rv.date is None
+ assert rv.to_header() == '"Test"'
+
+ # weak information is dropped
+ rv = http.parse_if_range_header('w/"Test"')
+ assert rv.etag == 'Test'
+ assert rv.date is None
+ assert rv.to_header() == '"Test"'
+
+ # broken etags are supported too
+ rv = http.parse_if_range_header('bullshit')
+ assert rv.etag == 'bullshit'
+ assert rv.date is None
+ assert rv.to_header() == '"bullshit"'
+
+ rv = http.parse_if_range_header('Thu, 01 Jan 1970 00:00:00 GMT')
+ assert rv.etag is None
+ assert rv.date == datetime(1970, 1, 1)
+ assert rv.to_header() == 'Thu, 01 Jan 1970 00:00:00 GMT'
+
+ for x in '', None:
+ rv = http.parse_if_range_header(x)
+ assert rv.etag is None
+ assert rv.date is None
+ assert rv.to_header() == ''
+
+ def test_range_parsing():
+ rv = http.parse_range_header('bytes=52')
+ assert rv is None
+
+ rv = http.parse_range_header('bytes=52-')
+ assert rv.units == 'bytes'
+ assert rv.ranges == [(52, None)]
+ assert rv.to_header() == 'bytes=52-'
+
+ rv = http.parse_range_header('bytes=52-99')
+ assert rv.units == 'bytes'
+ assert rv.ranges == [(52, 100)]
+ assert rv.to_header() == 'bytes=52-99'
+
+ rv = http.parse_range_header('bytes=52-99,-1000')
+ assert rv.units == 'bytes'
+ assert rv.ranges == [(52, 100), (-1000, None)]
+ assert rv.to_header() == 'bytes=52-99,-1000'
+
+ rv = http.parse_range_header('bytes = 1 - 100')
+ assert rv.units == 'bytes'
+ assert rv.ranges == [(1, 101)]
+ assert rv.to_header() == 'bytes=1-100'
+
+ rv = http.parse_range_header('AWesomes=0-999')
+ assert rv.units == 'awesomes'
+ assert rv.ranges == [(0, 1000)]
+ assert rv.to_header() == 'awesomes=0-999'
+
+ def test_content_range_parsing():
+ rv = http.parse_content_range_header('bytes 0-98/*')
+ assert rv.units == 'bytes'
+ assert rv.start == 0
+ assert rv.stop == 99
+ assert rv.length is None
+ assert rv.to_header() == 'bytes 0-98/*'
+
+ rv = http.parse_content_range_header('bytes 0-98/*asdfsa')
+ assert rv is None
+
+ rv = http.parse_content_range_header('bytes 0-99/100')
+ assert rv.to_header() == 'bytes 0-99/100'
+ rv.start = None
+ rv.stop = None
+ assert rv.units == 'bytes'
+ assert rv.to_header() == 'bytes */100'
+
+ rv = http.parse_content_range_header('bytes */100')
+ assert rv.start is None
+ assert rv.stop is None
+ assert rv.length == 100
+ assert rv.units == 'bytes'
+
+
+class RegressionTestCase(WerkzeugTestCase):
+
+ def test_best_match_works(self):
+ # was a bug in 0.6
+ rv = http.parse_accept_header('foo=,application/xml,application/xhtml+xml,'
+ 'text/html;q=0.9,text/plain;q=0.8,'
+ 'image/png,*/*;q=0.5',
+ datastructures.MIMEAccept).best_match(['foo/bar'])
+ self.assert_equal(rv, 'foo/bar')
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(HTTPUtilityTestCase))
+ suite.addTest(unittest.makeSuite(RegressionTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/internal.py b/websdk/werkzeug/testsuite/internal.py
new file mode 100644
index 0000000..1de1b5d
--- /dev/null
+++ b/websdk/werkzeug/testsuite/internal.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.internal
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Internal tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from datetime import datetime
+from warnings import filterwarnings, resetwarnings
+
+from werkzeug.testsuite import WerkzeugTestCase
+from werkzeug.wrappers import Request, Response
+
+from werkzeug import _internal as internal
+from werkzeug.test import create_environ
+
+
+class InternalTestCase(WerkzeugTestCase):
+
+ def test_date_to_unix(self):
+ assert internal._date_to_unix(datetime(1970, 1, 1)) == 0
+ assert internal._date_to_unix(datetime(1970, 1, 1, 1, 0, 0)) == 3600
+ assert internal._date_to_unix(datetime(1970, 1, 1, 1, 1, 1)) == 3661
+ x = datetime(2010, 2, 15, 16, 15, 39)
+ assert internal._date_to_unix(x) == 1266250539
+
+ def test_easteregg(self):
+ req = Request.from_values('/?macgybarchakku')
+ resp = Response.force_type(internal._easteregg(None), req)
+ assert 'About Werkzeug' in resp.data
+ assert 'the Swiss Army knife of Python web development' in resp.data
+
+ def test_wrapper_internals(self):
+ req = Request.from_values(data={'foo': 'bar'}, method='POST')
+ req._load_form_data()
+ assert req.form.to_dict() == {'foo': 'bar'}
+
+ # second call does not break
+ req._load_form_data()
+ assert req.form.to_dict() == {'foo': 'bar'}
+
+ # check reprs
+ assert repr(req) == "<Request 'http://localhost/' [POST]>"
+ resp = Response()
+ assert repr(resp) == '<Response 0 bytes [200 OK]>'
+ resp.data = 'Hello World!'
+ assert repr(resp) == '<Response 12 bytes [200 OK]>'
+ resp.response = iter(['Test'])
+ assert repr(resp) == '<Response streamed [200 OK]>'
+
+ # unicode data does not set content length
+ response = Response([u'Hällo Wörld'])
+ headers = response.get_wsgi_headers(create_environ())
+ assert 'Content-Length' not in headers
+
+ response = Response(['Hällo Wörld'])
+ headers = response.get_wsgi_headers(create_environ())
+ assert 'Content-Length' in headers
+
+ # check for internal warnings
+ filterwarnings('error', category=Warning)
+ response = Response()
+ environ = create_environ()
+ response.response = 'What the...?'
+ self.assert_raises(Warning, lambda: list(response.iter_encoded()))
+ self.assert_raises(Warning, lambda: list(response.get_app_iter(environ)))
+ response.direct_passthrough = True
+ self.assert_raises(Warning, lambda: list(response.iter_encoded()))
+ self.assert_raises(Warning, lambda: list(response.get_app_iter(environ)))
+ resetwarnings()
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(InternalTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/local.py b/websdk/werkzeug/testsuite/local.py
new file mode 100644
index 0000000..236e753
--- /dev/null
+++ b/websdk/werkzeug/testsuite/local.py
@@ -0,0 +1,133 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.local
+ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Local and local proxy tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import time
+import unittest
+from threading import Thread
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import local
+
+
+class LocalTestCase(WerkzeugTestCase):
+
+ def test_basic_local(self):
+ l = local.Local()
+ l.foo = 0
+ values = []
+ def value_setter(idx):
+ time.sleep(0.01 * idx)
+ l.foo = idx
+ time.sleep(0.02)
+ values.append(l.foo)
+ threads = [Thread(target=value_setter, args=(x,))
+ for x in [1, 2, 3]]
+ for thread in threads:
+ thread.start()
+ time.sleep(0.2)
+ assert sorted(values) == [1, 2, 3]
+
+ def delfoo():
+ del l.foo
+ delfoo()
+ self.assert_raises(AttributeError, lambda: l.foo)
+ self.assert_raises(AttributeError, delfoo)
+
+ local.release_local(l)
+
+ def test_local_release(self):
+ loc = local.Local()
+ loc.foo = 42
+ local.release_local(loc)
+ assert not hasattr(loc, 'foo')
+
+ ls = local.LocalStack()
+ ls.push(42)
+ local.release_local(ls)
+ assert ls.top is None
+
+ def test_local_proxy(self):
+ foo = []
+ ls = local.LocalProxy(lambda: foo)
+ ls.append(42)
+ ls.append(23)
+ ls[1:] = [1, 2, 3]
+ assert foo == [42, 1, 2, 3]
+ assert repr(foo) == repr(ls)
+ assert foo[0] == 42
+ foo += [1]
+ assert list(foo) == [42, 1, 2, 3, 1]
+
+ def test_local_stack(self):
+ ident = local.get_ident()
+
+ ls = local.LocalStack()
+ assert ident not in ls._local.__storage__
+ assert ls.top is None
+ ls.push(42)
+ assert ident in ls._local.__storage__
+ assert ls.top == 42
+ ls.push(23)
+ assert ls.top == 23
+ ls.pop()
+ assert ls.top == 42
+ ls.pop()
+ assert ls.top is None
+ assert ls.pop() is None
+ assert ls.pop() is None
+
+ proxy = ls()
+ ls.push([1, 2])
+ assert proxy == [1, 2]
+ ls.push((1, 2))
+ assert proxy == (1, 2)
+ ls.pop()
+ ls.pop()
+ assert repr(proxy) == '<LocalProxy unbound>'
+
+ assert ident not in ls._local.__storage__
+
+ def test_local_proxies_with_callables(self):
+ foo = 42
+ ls = local.LocalProxy(lambda: foo)
+ assert ls == 42
+ foo = [23]
+ ls.append(42)
+ assert ls == [23, 42]
+ assert foo == [23, 42]
+
+ def test_custom_idents(self):
+ ident = 0
+ loc = local.Local()
+ stack = local.LocalStack()
+ mgr = local.LocalManager([loc, stack], ident_func=lambda: ident)
+
+ loc.foo = 42
+ stack.push({'foo': 42})
+ ident = 1
+ loc.foo = 23
+ stack.push({'foo': 23})
+ ident = 0
+ assert loc.foo == 42
+ assert stack.top['foo'] == 42
+ stack.pop()
+ assert stack.top is None
+ ident = 1
+ assert loc.foo == 23
+ assert stack.top['foo'] == 23
+ stack.pop()
+ assert stack.top is None
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(LocalTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/multipart/collect.py b/websdk/werkzeug/testsuite/multipart/collect.py
new file mode 100644
index 0000000..5fce5e9
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/collect.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+"""
+Hacky helper application to collect form data.
+"""
+from werkzeug.serving import run_simple
+from werkzeug.wrappers import Request, Response
+
+
+def copy_stream(request):
+ from os import mkdir
+ from time import time
+ folder = 'request-%d' % time()
+ mkdir(folder)
+ environ = request.environ
+ f = file(folder + '/request.txt', 'wb+')
+ f.write(environ['wsgi.input'].read(int(environ['CONTENT_LENGTH'])))
+ f.flush()
+ f.seek(0)
+ environ['wsgi.input'] = f
+ request.stat_folder = folder
+
+
+def stats(request):
+ copy_stream(request)
+ f1 = request.files['file1']
+ f2 = request.files['file2']
+ text = request.form['text']
+ f1.save(request.stat_folder + '/file1.bin')
+ f2.save(request.stat_folder + '/file2.bin')
+ file(request.stat_folder + '/text.txt', 'w').write(text.encode('utf-8'))
+ return Response('Done.')
+
+
+def upload_file(request):
+ return Response('''
+ <h1>Upload File</h1>
+ <form action="" method="post" enctype="multipart/form-data">
+ <input type="file" name="file1"><br>
+ <input type="file" name="file2"><br>
+ <textarea name="text"></textarea><br>
+ <input type="submit" value="Send">
+ </form>
+ ''', mimetype='text/html')
+
+
+def application(environ, start_responseonse):
+ request = Request(environ)
+ if request.method == 'POST':
+ response = stats(request)
+ else:
+ response = upload_file(request)
+ return response(environ, start_responseonse)
+
+
+if __name__ == '__main__':
+ run_simple('localhost', 5000, application, use_debugger=True)
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png
new file mode 100644
index 0000000..9b3422c
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file1.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png
new file mode 100644
index 0000000..fb2efb8
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/file2.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt
new file mode 100644
index 0000000..721e04e
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt
new file mode 100644
index 0000000..c87634d
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2png1txt/text.txt
@@ -0,0 +1 @@
+example text \ No newline at end of file
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png
new file mode 100644
index 0000000..89c8129
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file1.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png
new file mode 100644
index 0000000..6332fef
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/file2.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt
new file mode 100644
index 0000000..489290b
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt
new file mode 100644
index 0000000..3bf804d
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/firefox3-2pnglongtext/text.txt
@@ -0,0 +1,3 @@
+--long text
+--with boundary
+--lookalikes-- \ No newline at end of file
diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png
new file mode 100644
index 0000000..9b3422c
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file1.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png
new file mode 100644
index 0000000..fb2efb8
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/file2.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt
new file mode 100644
index 0000000..59fdeae
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt
new file mode 100644
index 0000000..7c465b7
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/ie6-2png1txt/text.txt
@@ -0,0 +1 @@
+ie6 sucks :-/ \ No newline at end of file
diff --git a/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt b/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt
new file mode 100644
index 0000000..acc4e2e
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/ie7_full_path_request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png
new file mode 100644
index 0000000..7542db1
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file1.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png
new file mode 100644
index 0000000..658c711
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/file2.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt
new file mode 100644
index 0000000..8f32591
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt
new file mode 100644
index 0000000..ca01cb0
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/opera8-2png1txt/text.txt
@@ -0,0 +1 @@
+blafasel öäü \ No newline at end of file
diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png
new file mode 100644
index 0000000..afca073
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file1.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png
new file mode 100644
index 0000000..2a7da6e
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/file2.png
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt
new file mode 100644
index 0000000..b4ce0ee
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/request.txt
Binary files differ
diff --git a/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt
new file mode 100644
index 0000000..baa1300
--- /dev/null
+++ b/websdk/werkzeug/testsuite/multipart/webkit3-2png1txt/text.txt
@@ -0,0 +1 @@
+this is another text with ümläüts \ No newline at end of file
diff --git a/websdk/werkzeug/testsuite/res/test.txt b/websdk/werkzeug/testsuite/res/test.txt
new file mode 100644
index 0000000..a8efdcc
--- /dev/null
+++ b/websdk/werkzeug/testsuite/res/test.txt
@@ -0,0 +1 @@
+FOUND
diff --git a/websdk/werkzeug/testsuite/routing.py b/websdk/werkzeug/testsuite/routing.py
new file mode 100644
index 0000000..77b3cb6
--- /dev/null
+++ b/websdk/werkzeug/testsuite/routing.py
@@ -0,0 +1,610 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.routing
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Routing tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import routing as r
+from werkzeug.wrappers import Response
+from werkzeug.datastructures import ImmutableDict
+from werkzeug.test import create_environ
+
+
+class RoutingTestCase(WerkzeugTestCase):
+
+ def test_basic_routing(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index'),
+ r.Rule('/foo', endpoint='foo'),
+ r.Rule('/bar/', endpoint='bar')
+ ])
+ adapter = map.bind('example.org', '/')
+ assert adapter.match('/') == ('index', {})
+ assert adapter.match('/foo') == ('foo', {})
+ assert adapter.match('/bar/') == ('bar', {})
+ self.assert_raises(r.RequestRedirect, lambda: adapter.match('/bar'))
+ self.assert_raises(r.NotFound, lambda: adapter.match('/blub'))
+
+ adapter = map.bind('example.org', '/test')
+ try:
+ adapter.match('/bar')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.org/test/bar/'
+ else:
+ self.fail('Expected request redirect')
+
+ adapter = map.bind('example.org', '/')
+ try:
+ adapter.match('/bar')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.org/bar/'
+ else:
+ self.fail('Expected request redirect')
+
+ adapter = map.bind('example.org', '/')
+ try:
+ adapter.match('/bar', query_args={'aha': 'muhaha'})
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.org/bar/?aha=muhaha'
+ else:
+ self.fail('Expected request redirect')
+
+ adapter = map.bind('example.org', '/')
+ try:
+ adapter.match('/bar', query_args='aha=muhaha')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.org/bar/?aha=muhaha'
+ else:
+ self.fail('Expected request redirect')
+
+ adapter = map.bind_to_environ(create_environ('/bar?foo=bar',
+ 'http://example.org/'))
+ try:
+ adapter.match()
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.org/bar/?foo=bar'
+ else:
+ self.fail('Expected request redirect')
+
+ def test_environ_defaults(self):
+ environ = create_environ("/foo")
+ self.assert_equal(environ["PATH_INFO"], '/foo')
+ m = r.Map([r.Rule("/foo", endpoint="foo"), r.Rule("/bar", endpoint="bar")])
+ a = m.bind_to_environ(environ)
+ self.assert_equal(a.match("/foo"), ('foo', {}))
+ self.assert_equal(a.match(), ('foo', {}))
+ self.assert_equal(a.match("/bar"), ('bar', {}))
+ self.assert_raises(r.NotFound, a.match, "/bars")
+
+ def test_basic_building(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index'),
+ r.Rule('/foo', endpoint='foo'),
+ r.Rule('/bar/<baz>', endpoint='bar'),
+ r.Rule('/bar/<int:bazi>', endpoint='bari'),
+ r.Rule('/bar/<float:bazf>', endpoint='barf'),
+ r.Rule('/bar/<path:bazp>', endpoint='barp'),
+ r.Rule('/hehe', endpoint='blah', subdomain='blah')
+ ])
+ adapter = map.bind('example.org', '/', subdomain='blah')
+
+ assert adapter.build('index', {}) == 'http://example.org/'
+ assert adapter.build('foo', {}) == 'http://example.org/foo'
+ assert adapter.build('bar', {'baz': 'blub'}) == 'http://example.org/bar/blub'
+ assert adapter.build('bari', {'bazi': 50}) == 'http://example.org/bar/50'
+ assert adapter.build('barf', {'bazf': 0.815}) == 'http://example.org/bar/0.815'
+ assert adapter.build('barp', {'bazp': 'la/di'}) == 'http://example.org/bar/la/di'
+ assert adapter.build('blah', {}) == '/hehe'
+ self.assert_raises(r.BuildError, lambda: adapter.build('urks'))
+
+ adapter = map.bind('example.org', '/test', subdomain='blah')
+ assert adapter.build('index', {}) == 'http://example.org/test/'
+ assert adapter.build('foo', {}) == 'http://example.org/test/foo'
+ assert adapter.build('bar', {'baz': 'blub'}) == 'http://example.org/test/bar/blub'
+ assert adapter.build('bari', {'bazi': 50}) == 'http://example.org/test/bar/50'
+ assert adapter.build('barf', {'bazf': 0.815}) == 'http://example.org/test/bar/0.815'
+ assert adapter.build('barp', {'bazp': 'la/di'}) == 'http://example.org/test/bar/la/di'
+ assert adapter.build('blah', {}) == '/test/hehe'
+
+ def test_defaults(self):
+ map = r.Map([
+ r.Rule('/foo/', defaults={'page': 1}, endpoint='foo'),
+ r.Rule('/foo/<int:page>', endpoint='foo')
+ ])
+ adapter = map.bind('example.org', '/')
+
+ assert adapter.match('/foo/') == ('foo', {'page': 1})
+ self.assert_raises(r.RequestRedirect, lambda: adapter.match('/foo/1'))
+ assert adapter.match('/foo/2') == ('foo', {'page': 2})
+ assert adapter.build('foo', {}) == '/foo/'
+ assert adapter.build('foo', {'page': 1}) == '/foo/'
+ assert adapter.build('foo', {'page': 2}) == '/foo/2'
+
+ def test_greedy(self):
+ map = r.Map([
+ r.Rule('/foo', endpoint='foo'),
+ r.Rule('/<path:bar>', endpoint='bar'),
+ r.Rule('/<path:bar>/<path:blub>', endpoint='bar')
+ ])
+ adapter = map.bind('example.org', '/')
+
+ assert adapter.match('/foo') == ('foo', {})
+ assert adapter.match('/blub') == ('bar', {'bar': 'blub'})
+ assert adapter.match('/he/he') == ('bar', {'bar': 'he', 'blub': 'he'})
+
+ assert adapter.build('foo', {}) == '/foo'
+ assert adapter.build('bar', {'bar': 'blub'}) == '/blub'
+ assert adapter.build('bar', {'bar': 'blub', 'blub': 'bar'}) == '/blub/bar'
+
+ def test_path(self):
+ map = r.Map([
+ r.Rule('/', defaults={'name': 'FrontPage'}, endpoint='page'),
+ r.Rule('/Special', endpoint='special'),
+ r.Rule('/<int:year>', endpoint='year'),
+ r.Rule('/<path:name>', endpoint='page'),
+ r.Rule('/<path:name>/edit', endpoint='editpage'),
+ r.Rule('/<path:name>/silly/<path:name2>', endpoint='sillypage'),
+ r.Rule('/<path:name>/silly/<path:name2>/edit', endpoint='editsillypage'),
+ r.Rule('/Talk:<path:name>', endpoint='talk'),
+ r.Rule('/User:<username>', endpoint='user'),
+ r.Rule('/User:<username>/<path:name>', endpoint='userpage'),
+ r.Rule('/Files/<path:file>', endpoint='files'),
+ ])
+ adapter = map.bind('example.org', '/')
+
+ assert adapter.match('/') == ('page', {'name':'FrontPage'})
+ self.assert_raises(r.RequestRedirect, lambda: adapter.match('/FrontPage'))
+ assert adapter.match('/Special') == ('special', {})
+ assert adapter.match('/2007') == ('year', {'year':2007})
+ assert adapter.match('/Some/Page') == ('page', {'name':'Some/Page'})
+ assert adapter.match('/Some/Page/edit') == ('editpage', {'name':'Some/Page'})
+ assert adapter.match('/Foo/silly/bar') == ('sillypage', {'name':'Foo', 'name2':'bar'})
+ assert adapter.match('/Foo/silly/bar/edit') == ('editsillypage', {'name':'Foo', 'name2':'bar'})
+ assert adapter.match('/Talk:Foo/Bar') == ('talk', {'name':'Foo/Bar'})
+ assert adapter.match('/User:thomas') == ('user', {'username':'thomas'})
+ assert adapter.match('/User:thomas/projects/werkzeug') == \
+ ('userpage', {'username':'thomas', 'name':'projects/werkzeug'})
+ assert adapter.match('/Files/downloads/werkzeug/0.2.zip') == \
+ ('files', {'file':'downloads/werkzeug/0.2.zip'})
+
+ def test_dispatch(self):
+ env = create_environ('/')
+ map = r.Map([
+ r.Rule('/', endpoint='root'),
+ r.Rule('/foo/', endpoint='foo')
+ ])
+ adapter = map.bind_to_environ(env)
+
+ raise_this = None
+ def view_func(endpoint, values):
+ if raise_this is not None:
+ raise raise_this
+ return Response(repr((endpoint, values)))
+ dispatch = lambda p, q=False: Response.force_type(adapter.dispatch(view_func, p,
+ catch_http_exceptions=q), env)
+
+ assert dispatch('/').data == "('root', {})"
+ assert dispatch('/foo').status_code == 301
+ raise_this = r.NotFound()
+ self.assert_raises(r.NotFound, lambda: dispatch('/bar'))
+ assert dispatch('/bar', True).status_code == 404
+
+ def test_http_host_before_server_name(self):
+ env = {
+ 'HTTP_HOST': 'wiki.example.com',
+ 'SERVER_NAME': 'web0.example.com',
+ 'SERVER_PORT': '80',
+ 'SCRIPT_NAME': '',
+ 'PATH_INFO': '',
+ 'REQUEST_METHOD': 'GET',
+ 'wsgi.url_scheme': 'http'
+ }
+ map = r.Map([r.Rule('/', endpoint='index', subdomain='wiki')])
+ adapter = map.bind_to_environ(env, server_name='example.com')
+ assert adapter.match('/') == ('index', {})
+ assert adapter.build('index', force_external=True) == 'http://wiki.example.com/'
+ assert adapter.build('index') == '/'
+
+ env['HTTP_HOST'] = 'admin.example.com'
+ adapter = map.bind_to_environ(env, server_name='example.com')
+ assert adapter.build('index') == 'http://wiki.example.com/'
+
+ def test_adapter_url_parameter_sorting(self):
+ map = r.Map([r.Rule('/', endpoint='index')], sort_parameters=True,
+ sort_key=lambda x: x[1])
+ adapter = map.bind('localhost', '/')
+ assert adapter.build('index', {'x': 20, 'y': 10, 'z': 30},
+ force_external=True) == 'http://localhost/?y=10&x=20&z=30'
+
+ def test_request_direct_charset_bug(self):
+ map = r.Map([r.Rule(u'/öäü/')])
+ adapter = map.bind('localhost', '/')
+ try:
+ adapter.match(u'/öäü')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://localhost/%C3%B6%C3%A4%C3%BC/'
+ else:
+ self.fail('expected request redirect exception')
+
+ def test_request_redirect_default(self):
+ map = r.Map([r.Rule(u'/foo', defaults={'bar': 42}),
+ r.Rule(u'/foo/<int:bar>')])
+ adapter = map.bind('localhost', '/')
+ try:
+ adapter.match(u'/foo/42')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://localhost/foo'
+ else:
+ self.fail('expected request redirect exception')
+
+ def test_request_redirect_default_subdomain(self):
+ map = r.Map([r.Rule(u'/foo', defaults={'bar': 42}, subdomain='test'),
+ r.Rule(u'/foo/<int:bar>', subdomain='other')])
+ adapter = map.bind('localhost', '/', subdomain='other')
+ try:
+ adapter.match(u'/foo/42')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://test.localhost/foo'
+ else:
+ self.fail('expected request redirect exception')
+
+ def test_adapter_match_return_rule(self):
+ rule = r.Rule('/foo/', endpoint='foo')
+ map = r.Map([rule])
+ adapter = map.bind('localhost', '/')
+ assert adapter.match('/foo/', return_rule=True) == (rule, {})
+
+ def test_server_name_interpolation(self):
+ server_name = 'example.invalid'
+ map = r.Map([r.Rule('/', endpoint='index'),
+ r.Rule('/', endpoint='alt', subdomain='alt')])
+
+ env = create_environ('/', 'http://%s/' % server_name)
+ adapter = map.bind_to_environ(env, server_name=server_name)
+ assert adapter.match() == ('index', {})
+
+ env = create_environ('/', 'http://alt.%s/' % server_name)
+ adapter = map.bind_to_environ(env, server_name=server_name)
+ assert adapter.match() == ('alt', {})
+
+ env = create_environ('/', 'http://%s/' % server_name)
+ adapter = map.bind_to_environ(env, server_name='foo')
+ assert adapter.subdomain == '<invalid>'
+
+ def test_rule_emptying(self):
+ rule = r.Rule('/foo', {'meh': 'muh'}, 'x', ['POST'],
+ False, 'x', True, None)
+ rule2 = rule.empty()
+ assert rule.__dict__ == rule2.__dict__
+ rule.methods.add('GET')
+ assert rule.__dict__ != rule2.__dict__
+ rule.methods.discard('GET')
+ rule.defaults['meh'] = 'aha'
+ assert rule.__dict__ != rule2.__dict__
+
+ def test_rule_templates(self):
+ testcase = r.RuleTemplate(
+ [ r.Submount('/test/$app',
+ [ r.Rule('/foo/', endpoint='handle_foo')
+ , r.Rule('/bar/', endpoint='handle_bar')
+ , r.Rule('/baz/', endpoint='handle_baz')
+ ]),
+ r.EndpointPrefix('${app}',
+ [ r.Rule('/${app}-blah', endpoint='bar')
+ , r.Rule('/${app}-meh', endpoint='baz')
+ ]),
+ r.Subdomain('$app',
+ [ r.Rule('/blah', endpoint='x_bar')
+ , r.Rule('/meh', endpoint='x_baz')
+ ])
+ ])
+
+ url_map = r.Map(
+ [ testcase(app='test1')
+ , testcase(app='test2')
+ , testcase(app='test3')
+ , testcase(app='test4')
+ ])
+
+ out = sorted([(x.rule, x.subdomain, x.endpoint)
+ for x in url_map.iter_rules()])
+
+ assert out == ([
+ ('/blah', 'test1', 'x_bar'),
+ ('/blah', 'test2', 'x_bar'),
+ ('/blah', 'test3', 'x_bar'),
+ ('/blah', 'test4', 'x_bar'),
+ ('/meh', 'test1', 'x_baz'),
+ ('/meh', 'test2', 'x_baz'),
+ ('/meh', 'test3', 'x_baz'),
+ ('/meh', 'test4', 'x_baz'),
+ ('/test/test1/bar/', '', 'handle_bar'),
+ ('/test/test1/baz/', '', 'handle_baz'),
+ ('/test/test1/foo/', '', 'handle_foo'),
+ ('/test/test2/bar/', '', 'handle_bar'),
+ ('/test/test2/baz/', '', 'handle_baz'),
+ ('/test/test2/foo/', '', 'handle_foo'),
+ ('/test/test3/bar/', '', 'handle_bar'),
+ ('/test/test3/baz/', '', 'handle_baz'),
+ ('/test/test3/foo/', '', 'handle_foo'),
+ ('/test/test4/bar/', '', 'handle_bar'),
+ ('/test/test4/baz/', '', 'handle_baz'),
+ ('/test/test4/foo/', '', 'handle_foo'),
+ ('/test1-blah', '', 'test1bar'),
+ ('/test1-meh', '', 'test1baz'),
+ ('/test2-blah', '', 'test2bar'),
+ ('/test2-meh', '', 'test2baz'),
+ ('/test3-blah', '', 'test3bar'),
+ ('/test3-meh', '', 'test3baz'),
+ ('/test4-blah', '', 'test4bar'),
+ ('/test4-meh', '', 'test4baz')
+ ])
+
+ def test_complex_routing_rules(self):
+ m = r.Map([
+ r.Rule('/', endpoint='index'),
+ r.Rule('/<int:blub>', endpoint='an_int'),
+ r.Rule('/<blub>', endpoint='a_string'),
+ r.Rule('/foo/', endpoint='nested'),
+ r.Rule('/foobar/', endpoint='nestedbar'),
+ r.Rule('/foo/<path:testing>/', endpoint='nested_show'),
+ r.Rule('/foo/<path:testing>/edit', endpoint='nested_edit'),
+ r.Rule('/users/', endpoint='users', defaults={'page': 1}),
+ r.Rule('/users/page/<int:page>', endpoint='users'),
+ r.Rule('/foox', endpoint='foox'),
+ r.Rule('/<path:bar>/<path:blub>', endpoint='barx_path_path')
+ ])
+ a = m.bind('example.com')
+
+ assert a.match('/') == ('index', {})
+ assert a.match('/42') == ('an_int', {'blub': 42})
+ assert a.match('/blub') == ('a_string', {'blub': 'blub'})
+ assert a.match('/foo/') == ('nested', {})
+ assert a.match('/foobar/') == ('nestedbar', {})
+ assert a.match('/foo/1/2/3/') == ('nested_show', {'testing': '1/2/3'})
+ assert a.match('/foo/1/2/3/edit') == ('nested_edit', {'testing': '1/2/3'})
+ assert a.match('/users/') == ('users', {'page': 1})
+ assert a.match('/users/page/2') == ('users', {'page': 2})
+ assert a.match('/foox') == ('foox', {})
+ assert a.match('/1/2/3') == ('barx_path_path', {'bar': '1', 'blub': '2/3'})
+
+ assert a.build('index') == '/'
+ assert a.build('an_int', {'blub': 42}) == '/42'
+ assert a.build('a_string', {'blub': 'test'}) == '/test'
+ assert a.build('nested') == '/foo/'
+ assert a.build('nestedbar') == '/foobar/'
+ assert a.build('nested_show', {'testing': '1/2/3'}) == '/foo/1/2/3/'
+ assert a.build('nested_edit', {'testing': '1/2/3'}) == '/foo/1/2/3/edit'
+ assert a.build('users', {'page': 1}) == '/users/'
+ assert a.build('users', {'page': 2}) == '/users/page/2'
+ assert a.build('foox') == '/foox'
+ assert a.build('barx_path_path', {'bar': '1', 'blub': '2/3'}) == '/1/2/3'
+
+ def test_default_converters(self):
+ class MyMap(r.Map):
+ default_converters = r.Map.default_converters.copy()
+ default_converters['foo'] = r.UnicodeConverter
+ assert isinstance(r.Map.default_converters, ImmutableDict)
+ m = MyMap([
+ r.Rule('/a/<foo:a>', endpoint='a'),
+ r.Rule('/b/<foo:b>', endpoint='b'),
+ r.Rule('/c/<c>', endpoint='c')
+ ], converters={'bar': r.UnicodeConverter})
+ a = m.bind('example.org', '/')
+ assert a.match('/a/1') == ('a', {'a': '1'})
+ assert a.match('/b/2') == ('b', {'b': '2'})
+ assert a.match('/c/3') == ('c', {'c': '3'})
+ assert 'foo' not in r.Map.default_converters
+
+ def test_build_append_unknown(self):
+ map = r.Map([
+ r.Rule('/bar/<float:bazf>', endpoint='barf')
+ ])
+ adapter = map.bind('example.org', '/', subdomain='blah')
+ assert adapter.build('barf', {'bazf': 0.815, 'bif' : 1.0}) == \
+ 'http://example.org/bar/0.815?bif=1.0'
+ assert adapter.build('barf', {'bazf': 0.815, 'bif' : 1.0},
+ append_unknown=False) == 'http://example.org/bar/0.815'
+
+ def test_method_fallback(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index', methods=['GET']),
+ r.Rule('/<name>', endpoint='hello_name', methods=['GET']),
+ r.Rule('/select', endpoint='hello_select', methods=['POST']),
+ r.Rule('/search_get', endpoint='search', methods=['GET']),
+ r.Rule('/search_post', endpoint='search', methods=['POST'])
+ ])
+ adapter = map.bind('example.com')
+ assert adapter.build('index') == '/'
+ assert adapter.build('index', method='GET') == '/'
+ assert adapter.build('hello_name', {'name': 'foo'}) == '/foo'
+ assert adapter.build('hello_select') == '/select'
+ assert adapter.build('hello_select', method='POST') == '/select'
+ assert adapter.build('search') == '/search_get'
+ assert adapter.build('search', method='GET') == '/search_get'
+ assert adapter.build('search', method='POST') == '/search_post'
+
+ def test_implicit_head(self):
+ url_map = r.Map([
+ r.Rule('/get', methods=['GET'], endpoint='a'),
+ r.Rule('/post', methods=['POST'], endpoint='b')
+ ])
+ adapter = url_map.bind('example.org')
+ assert adapter.match('/get', method='HEAD') == ('a', {})
+ self.assert_raises(r.MethodNotAllowed, adapter.match,
+ '/post', method='HEAD')
+
+ def test_protocol_joining_bug(self):
+ m = r.Map([r.Rule('/<foo>', endpoint='x')])
+ a = m.bind('example.org')
+ assert a.build('x', {'foo': 'x:y'}) == '/x:y'
+ assert a.build('x', {'foo': 'x:y'}, force_external=True) == \
+ 'http://example.org/x:y'
+
+ def test_allowed_methods_querying(self):
+ m = r.Map([r.Rule('/<foo>', methods=['GET', 'HEAD']),
+ r.Rule('/foo', methods=['POST'])])
+ a = m.bind('example.org')
+ assert sorted(a.allowed_methods('/foo')) == ['GET', 'HEAD', 'POST']
+
+ def test_external_building_with_port(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index'),
+ ])
+ adapter = map.bind('example.org:5000', '/')
+ built_url = adapter.build('index', {}, force_external=True)
+ assert built_url == 'http://example.org:5000/', built_url
+
+ def test_external_building_with_port_bind_to_environ(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index'),
+ ])
+ adapter = map.bind_to_environ(
+ create_environ('/', 'http://example.org:5000/'),
+ server_name="example.org:5000"
+ )
+ built_url = adapter.build('index', {}, force_external=True)
+ assert built_url == 'http://example.org:5000/', built_url
+
+ def test_external_building_with_port_bind_to_environ_wrong_servername(self):
+ map = r.Map([
+ r.Rule('/', endpoint='index'),
+ ])
+ environ = create_environ('/', 'http://example.org:5000/')
+ adapter = map.bind_to_environ(environ, server_name="example.org")
+ assert adapter.subdomain == '<invalid>'
+
+ def test_converter_parser(self):
+ args, kwargs = r.parse_converter_args(u'test, a=1, b=3.0')
+
+ assert args == ('test',)
+ assert kwargs == {'a': 1, 'b': 3.0 }
+
+ args, kwargs = r.parse_converter_args('')
+ assert not args and not kwargs
+
+ args, kwargs = r.parse_converter_args('a, b, c,')
+ assert args == ('a', 'b', 'c')
+ assert not kwargs
+
+ args, kwargs = r.parse_converter_args('True, False, None')
+ assert args == (True, False, None)
+
+ args, kwargs = r.parse_converter_args('"foo", u"bar"')
+ assert args == ('foo', 'bar')
+
+ def test_alias_redirects(self):
+ m = r.Map([
+ r.Rule('/', endpoint='index'),
+ r.Rule('/index.html', endpoint='index', alias=True),
+ r.Rule('/users/', defaults={'page': 1}, endpoint='users'),
+ r.Rule('/users/index.html', defaults={'page': 1}, alias=True,
+ endpoint='users'),
+ r.Rule('/users/page/<int:page>', endpoint='users'),
+ r.Rule('/users/page-<int:page>.html', alias=True, endpoint='users'),
+ ])
+ a = m.bind('example.com')
+
+ def ensure_redirect(path, new_url, args=None):
+ try:
+ a.match(path, query_args=args)
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://example.com' + new_url
+ else:
+ assert False, 'expected redirect'
+
+ ensure_redirect('/index.html', '/')
+ ensure_redirect('/users/index.html', '/users/')
+ ensure_redirect('/users/page-2.html', '/users/page/2')
+ ensure_redirect('/users/page-1.html', '/users/')
+ ensure_redirect('/users/page-1.html', '/users/?foo=bar', {'foo': 'bar'})
+
+ assert a.build('index') == '/'
+ assert a.build('users', {'page': 1}) == '/users/'
+ assert a.build('users', {'page': 2}) == '/users/page/2'
+
+ def test_double_defaults(self):
+ for prefix in '', '/aaa':
+ m = r.Map([
+ r.Rule(prefix + '/', defaults={'foo': 1, 'bar': False}, endpoint='x'),
+ r.Rule(prefix + '/<int:foo>', defaults={'bar': False}, endpoint='x'),
+ r.Rule(prefix + '/bar/', defaults={'foo': 1, 'bar': True}, endpoint='x'),
+ r.Rule(prefix + '/bar/<int:foo>', defaults={'bar': True}, endpoint='x')
+ ])
+ a = m.bind('example.com')
+
+ assert a.match(prefix + '/') == ('x', {'foo': 1, 'bar': False})
+ assert a.match(prefix + '/2') == ('x', {'foo': 2, 'bar': False})
+ assert a.match(prefix + '/bar/') == ('x', {'foo': 1, 'bar': True})
+ assert a.match(prefix + '/bar/2') == ('x', {'foo': 2, 'bar': True})
+
+ assert a.build('x', {'foo': 1, 'bar': False}) == prefix + '/'
+ assert a.build('x', {'foo': 2, 'bar': False}) == prefix + '/2'
+ assert a.build('x', {'bar': False}) == prefix + '/'
+ assert a.build('x', {'foo': 1, 'bar': True}) == prefix + '/bar/'
+ assert a.build('x', {'foo': 2, 'bar': True}) == prefix + '/bar/2'
+ assert a.build('x', {'bar': True}) == prefix + '/bar/'
+
+ def test_host_matching(self):
+ m = r.Map([
+ r.Rule('/', endpoint='index', host='www.<domain>'),
+ r.Rule('/', endpoint='files', host='files.<domain>'),
+ r.Rule('/foo/', defaults={'page': 1}, host='www.<domain>', endpoint='x'),
+ r.Rule('/<int:page>', host='files.<domain>', endpoint='x')
+ ], host_matching=True)
+
+ a = m.bind('www.example.com')
+ assert a.match('/') == ('index', {'domain': 'example.com'})
+ assert a.match('/foo/') == ('x', {'domain': 'example.com', 'page': 1})
+ try:
+ a.match('/foo')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://www.example.com/foo/'
+ else:
+ assert False, 'expected redirect'
+
+ a = m.bind('files.example.com')
+ assert a.match('/') == ('files', {'domain': 'example.com'})
+ assert a.match('/2') == ('x', {'domain': 'example.com', 'page': 2})
+ try:
+ a.match('/1')
+ except r.RequestRedirect, e:
+ assert e.new_url == 'http://www.example.com/foo/'
+ else:
+ assert False, 'expected redirect'
+
+ def test_server_name_casing(self):
+ m = r.Map([
+ r.Rule('/', endpoint='index', subdomain='foo')
+ ])
+
+ env = create_environ()
+ env['SERVER_NAME'] = env['HTTP_HOST'] = 'FOO.EXAMPLE.COM'
+ a = m.bind_to_environ(env, server_name='example.com')
+ assert a.match('/') == ('index', {})
+
+ env = create_environ()
+ env['SERVER_NAME'] = '127.0.0.1'
+ env['SERVER_PORT'] = '5000'
+ del env['HTTP_HOST']
+ a = m.bind_to_environ(env, server_name='example.com')
+ try:
+ a.match()
+ except r.NotFound:
+ pass
+ else:
+ assert False, 'Expected not found exception'
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(RoutingTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/security.py b/websdk/werkzeug/testsuite/security.py
new file mode 100644
index 0000000..bdd0b34
--- /dev/null
+++ b/websdk/werkzeug/testsuite/security.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.security
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the security helpers.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import os
+import unittest
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.security import check_password_hash, generate_password_hash, \
+ safe_join
+
+
+class SecurityTestCase(WerkzeugTestCase):
+
+ def test_password_hashing(self):
+ """Test the password hashing and password hash checking"""
+ hash1 = generate_password_hash('default')
+ hash2 = generate_password_hash(u'default', method='sha1')
+ assert hash1 != hash2
+ assert check_password_hash(hash1, 'default')
+ assert check_password_hash(hash2, 'default')
+ assert hash1.startswith('sha1$')
+ assert hash2.startswith('sha1$')
+
+ fakehash = generate_password_hash('default', method='plain')
+ assert fakehash == 'plain$$default'
+ assert check_password_hash(fakehash, 'default')
+
+ mhash = generate_password_hash(u'default', method='md5')
+ assert mhash.startswith('md5$')
+ assert check_password_hash(mhash, 'default')
+
+ legacy = 'md5$$c21f969b5f03d33d43e04f8f136e7682'
+ assert check_password_hash(legacy, 'default')
+
+ legacy = u'md5$$c21f969b5f03d33d43e04f8f136e7682'
+ assert check_password_hash(legacy, 'default')
+
+ def test_safe_join(self):
+ """Test the safe joining helper"""
+ assert safe_join('foo', 'bar/baz') == os.path.join('foo', 'bar/baz')
+ assert safe_join('foo', '../bar/baz') is None
+ if os.name == 'nt':
+ assert safe_join('foo', 'foo\\bar') is None
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(SecurityTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/serving.py b/websdk/werkzeug/testsuite/serving.py
new file mode 100644
index 0000000..e26271d
--- /dev/null
+++ b/websdk/werkzeug/testsuite/serving.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.serving
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Added serving tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import sys
+import time
+import urllib
+import unittest
+from functools import update_wrapper
+from StringIO import StringIO
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import __version__ as version, serving
+from werkzeug.testapp import test_app
+from threading import Thread
+
+
+real_make_server = serving.make_server
+
+
+def silencestderr(f):
+ def new_func(*args, **kwargs):
+ old_stderr = sys.stderr
+ sys.stderr = StringIO()
+ try:
+ return f(*args, **kwargs)
+ finally:
+ sys.stderr = old_stderr
+ return update_wrapper(new_func, f)
+
+
+def run_dev_server(application):
+ servers = []
+ def tracking_make_server(*args, **kwargs):
+ srv = real_make_server(*args, **kwargs)
+ servers.append(srv)
+ return srv
+ serving.make_server = tracking_make_server
+ try:
+ t = Thread(target=serving.run_simple, args=('localhost', 0, application))
+ t.setDaemon(True)
+ t.start()
+ time.sleep(0.25)
+ finally:
+ serving.make_server = real_make_server
+ if not servers:
+ return None, None
+ server ,= servers
+ ip, port = server.socket.getsockname()[:2]
+ if ':' in ip:
+ ip = '[%s]' % ip
+ return server, '%s:%d' % (ip, port)
+
+
+class ServingTestCase(WerkzeugTestCase):
+
+ @silencestderr
+ def test_serving(self):
+ server, addr = run_dev_server(test_app)
+ rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read()
+ assert 'WSGI Information' in rv
+ assert 'foo=bar&amp;baz=blah' in rv
+ assert ('Werkzeug/%s' % version) in rv
+
+ @silencestderr
+ def test_broken_app(self):
+ def broken_app(environ, start_response):
+ 1/0
+ server, addr = run_dev_server(broken_app)
+ rv = urllib.urlopen('http://%s/?foo=bar&baz=blah' % addr).read()
+ assert 'Internal Server Error' in rv
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ServingTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/test.py b/websdk/werkzeug/testsuite/test.py
new file mode 100644
index 0000000..795de90
--- /dev/null
+++ b/websdk/werkzeug/testsuite/test.py
@@ -0,0 +1,376 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.test
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the testing tools.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import sys
+import unittest
+from cStringIO import StringIO, OutputType
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.wrappers import Request, Response, BaseResponse
+from werkzeug.test import Client, EnvironBuilder, create_environ, \
+ ClientRedirectError, stream_encode_multipart, run_wsgi_app
+from werkzeug.utils import redirect
+from werkzeug.formparser import parse_form_data
+from werkzeug.datastructures import MultiDict
+
+
+def cookie_app(environ, start_response):
+ """A WSGI application which sets a cookie, and returns as a ersponse any
+ cookie which exists.
+ """
+ response = Response(environ.get('HTTP_COOKIE', 'No Cookie'),
+ mimetype='text/plain')
+ response.set_cookie('test', 'test')
+ return response(environ, start_response)
+
+
+def redirect_loop_app(environ, start_response):
+ response = redirect('http://localhost/some/redirect/')
+ return response(environ, start_response)
+
+
+def redirect_with_get_app(environ, start_response):
+ req = Request(environ)
+ if req.url not in ('http://localhost/',
+ 'http://localhost/first/request',
+ 'http://localhost/some/redirect/'):
+ assert False, 'redirect_demo_app() did not expect URL "%s"' % req.url
+ if '/some/redirect' not in req.url:
+ response = redirect('http://localhost/some/redirect/')
+ else:
+ response = Response('current url: %s' % req.url)
+ return response(environ, start_response)
+
+
+def redirect_with_post_app(environ, start_response):
+ req = Request(environ)
+ if req.url == 'http://localhost/some/redirect/':
+ assert req.method == 'GET', 'request should be GET'
+ assert not req.form, 'request should not have data'
+ response = Response('current url: %s' % req.url)
+ else:
+ response = redirect('http://localhost/some/redirect/')
+ return response(environ, start_response)
+
+
+def external_redirect_demo_app(environ, start_response):
+ response = redirect('http://example.com/')
+ return response(environ, start_response)
+
+
+def external_subdomain_redirect_demo_app(environ, start_response):
+ if 'test.example.com' in environ['HTTP_HOST']:
+ response = Response('redirected successfully to subdomain')
+ else:
+ response = redirect('http://test.example.com/login')
+ return response(environ, start_response)
+
+
+def multi_value_post_app(environ, start_response):
+ req = Request(environ)
+ assert req.form['field'] == 'val1', req.form['field']
+ assert req.form.getlist('field') == ['val1', 'val2'], req.form.getlist('field')
+ response = Response('ok')
+ return response(environ, start_response)
+
+
+class TestTestCase(WerkzeugTestCase):
+
+ def test_cookie_forging(self):
+ c = Client(cookie_app)
+ c.set_cookie('localhost', 'foo', 'bar')
+ appiter, code, headers = c.open()
+ assert list(appiter) == ['foo=bar']
+
+ def test_set_cookie_app(self):
+ c = Client(cookie_app)
+ appiter, code, headers = c.open()
+ assert 'Set-Cookie' in dict(headers)
+
+ def test_cookiejar_stores_cookie(self):
+ c = Client(cookie_app)
+ appiter, code, headers = c.open()
+ assert 'test' in c.cookie_jar._cookies['localhost.local']['/']
+
+ def test_no_initial_cookie(self):
+ c = Client(cookie_app)
+ appiter, code, headers = c.open()
+ assert ''.join(appiter) == 'No Cookie'
+
+ def test_resent_cookie(self):
+ c = Client(cookie_app)
+ c.open()
+ appiter, code, headers = c.open()
+ assert ''.join(appiter) == 'test=test'
+
+ def test_disable_cookies(self):
+ c = Client(cookie_app, use_cookies=False)
+ c.open()
+ appiter, code, headers = c.open()
+ assert ''.join(appiter) == 'No Cookie'
+
+ def test_cookie_for_different_path(self):
+ c = Client(cookie_app)
+ c.open('/path1')
+ appiter, code, headers = c.open('/path2')
+ assert ''.join(appiter) == 'test=test'
+
+ def test_environ_builder_basics(self):
+ b = EnvironBuilder()
+ assert b.content_type is None
+ b.method = 'POST'
+ assert b.content_type == 'application/x-www-form-urlencoded'
+ b.files.add_file('test', StringIO('test contents'), 'test.txt')
+ assert b.files['test'].content_type == 'text/plain'
+ assert b.content_type == 'multipart/form-data'
+ b.form['test'] = 'normal value'
+
+ req = b.get_request()
+ b.close()
+
+ assert req.url == 'http://localhost/'
+ assert req.method == 'POST'
+ assert req.form['test'] == 'normal value'
+ assert req.files['test'].content_type == 'text/plain'
+ assert req.files['test'].filename == 'test.txt'
+ assert req.files['test'].read() == 'test contents'
+
+ def test_environ_builder_headers(self):
+ b = EnvironBuilder(environ_base={'HTTP_USER_AGENT': 'Foo/0.1'},
+ environ_overrides={'wsgi.version': (1, 1)})
+ b.headers['X-Suck-My-Dick'] = 'very well sir'
+ env = b.get_environ()
+ assert env['HTTP_USER_AGENT'] == 'Foo/0.1'
+ assert env['HTTP_X_SUCK_MY_DICK'] == 'very well sir'
+ assert env['wsgi.version'] == (1, 1)
+
+ b.headers['User-Agent'] = 'Bar/1.0'
+ env = b.get_environ()
+ assert env['HTTP_USER_AGENT'] == 'Bar/1.0'
+
+ def test_environ_builder_paths(self):
+ b = EnvironBuilder(path='/foo', base_url='http://example.com/')
+ assert b.base_url == 'http://example.com/'
+ assert b.path == '/foo'
+ assert b.script_root == ''
+ assert b.host == 'example.com'
+
+ b = EnvironBuilder(path='/foo', base_url='http://example.com/bar')
+ assert b.base_url == 'http://example.com/bar/'
+ assert b.path == '/foo'
+ assert b.script_root == '/bar'
+ assert b.host == 'example.com'
+
+ b.host = 'localhost'
+ assert b.base_url == 'http://localhost/bar/'
+ b.base_url = 'http://localhost:8080/'
+ assert b.host == 'localhost:8080'
+ assert b.server_name == 'localhost'
+ assert b.server_port == 8080
+
+ b.host = 'foo.invalid'
+ b.url_scheme = 'https'
+ b.script_root = '/test'
+ env = b.get_environ()
+ assert env['SERVER_NAME'] == 'foo.invalid'
+ assert env['SERVER_PORT'] == '443'
+ assert env['SCRIPT_NAME'] == '/test'
+ assert env['PATH_INFO'] == '/foo'
+ assert env['HTTP_HOST'] == 'foo.invalid'
+ assert env['wsgi.url_scheme'] == 'https'
+ assert b.base_url == 'https://foo.invalid/test/'
+
+ def test_environ_builder_content_type(self):
+ builder = EnvironBuilder()
+ assert builder.content_type is None
+ builder.method = 'POST'
+ assert builder.content_type == 'application/x-www-form-urlencoded'
+ builder.form['foo'] = 'bar'
+ assert builder.content_type == 'application/x-www-form-urlencoded'
+ builder.files.add_file('blafasel', StringIO('foo'), 'test.txt')
+ assert builder.content_type == 'multipart/form-data'
+ req = builder.get_request()
+ assert req.form['foo'] == 'bar'
+ assert req.files['blafasel'].read() == 'foo'
+
+ def test_environ_builder_stream_switch(self):
+ d = MultiDict(dict(foo=u'bar', blub=u'blah', hu=u'hum'))
+ for use_tempfile in False, True:
+ stream, length, boundary = stream_encode_multipart(
+ d, use_tempfile, threshold=150)
+ assert isinstance(stream, OutputType) != use_tempfile
+
+ form = parse_form_data({'wsgi.input': stream, 'CONTENT_LENGTH': str(length),
+ 'CONTENT_TYPE': 'multipart/form-data; boundary="%s"' %
+ boundary})[1]
+ assert form == d
+
+ def test_create_environ(self):
+ env = create_environ('/foo?bar=baz', 'http://example.org/')
+ expected = {
+ 'wsgi.multiprocess': False,
+ 'wsgi.version': (1, 0),
+ 'wsgi.run_once': False,
+ 'wsgi.errors': sys.stderr,
+ 'wsgi.multithread': False,
+ 'wsgi.url_scheme': 'http',
+ 'SCRIPT_NAME': '',
+ 'CONTENT_TYPE': '',
+ 'CONTENT_LENGTH': '0',
+ 'SERVER_NAME': 'example.org',
+ 'REQUEST_METHOD': 'GET',
+ 'HTTP_HOST': 'example.org',
+ 'PATH_INFO': '/foo',
+ 'SERVER_PORT': '80',
+ 'SERVER_PROTOCOL': 'HTTP/1.1',
+ 'QUERY_STRING': 'bar=baz'
+ }
+ for key, value in expected.iteritems():
+ assert env[key] == value
+ assert env['wsgi.input'].read(0) == ''
+
+ assert create_environ('/foo', 'http://example.com/')['SCRIPT_NAME'] == ''
+
+ def test_file_closing(self):
+ closed = []
+ class SpecialInput(object):
+ def read(self):
+ return ''
+ def close(self):
+ closed.append(self)
+
+ env = create_environ(data={'foo': SpecialInput()})
+ assert len(closed) == 1
+ builder = EnvironBuilder()
+ builder.files.add_file('blah', SpecialInput())
+ builder.close()
+ assert len(closed) == 2
+
+ def test_follow_redirect(self):
+ env = create_environ('/', base_url='http://localhost')
+ c = Client(redirect_with_get_app)
+ appiter, code, headers = c.open(environ_overrides=env, follow_redirects=True)
+ assert code == '200 OK'
+ assert ''.join(appiter) == 'current url: http://localhost/some/redirect/'
+
+ # Test that the :cls:`Client` is aware of user defined response wrappers
+ c = Client(redirect_with_get_app, response_wrapper=BaseResponse)
+ resp = c.get('/', follow_redirects=True)
+ assert resp.status_code == 200
+ assert resp.data == 'current url: http://localhost/some/redirect/'
+
+ # test with URL other than '/' to make sure redirected URL's are correct
+ c = Client(redirect_with_get_app, response_wrapper=BaseResponse)
+ resp = c.get('/first/request', follow_redirects=True)
+ assert resp.status_code == 200
+ assert resp.data == 'current url: http://localhost/some/redirect/'
+
+ def test_follow_external_redirect(self):
+ env = create_environ('/', base_url='http://localhost')
+ c = Client(external_redirect_demo_app)
+ self.assert_raises(RuntimeError, lambda:
+ c.get(environ_overrides=env, follow_redirects=True))
+
+ def test_follow_external_redirect_on_same_subdomain(self):
+ env = create_environ('/', base_url='http://example.com')
+ c = Client(external_subdomain_redirect_demo_app, allow_subdomain_redirects=True)
+ c.get(environ_overrides=env, follow_redirects=True)
+
+ # check that this does not work for real external domains
+ env = create_environ('/', base_url='http://localhost')
+ self.assert_raises(RuntimeError, lambda:
+ c.get(environ_overrides=env, follow_redirects=True))
+
+ # check that subdomain redirects fail if no `allow_subdomain_redirects` is applied
+ c = Client(external_subdomain_redirect_demo_app)
+ self.assert_raises(RuntimeError, lambda:
+ c.get(environ_overrides=env, follow_redirects=True))
+
+ def test_follow_redirect_loop(self):
+ c = Client(redirect_loop_app, response_wrapper=BaseResponse)
+ with self.assert_raises(ClientRedirectError):
+ resp = c.get('/', follow_redirects=True)
+
+ def test_follow_redirect_with_post(self):
+ c = Client(redirect_with_post_app, response_wrapper=BaseResponse)
+ resp = c.post('/', follow_redirects=True, data='foo=blub+hehe&blah=42')
+ assert resp.status_code == 200
+ assert resp.data == 'current url: http://localhost/some/redirect/'
+
+ def test_path_info_script_name_unquoting(self):
+ def test_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [environ['PATH_INFO'] + '\n' + environ['SCRIPT_NAME']]
+ c = Client(test_app, response_wrapper=BaseResponse)
+ resp = c.get('/foo%40bar')
+ assert resp.data == '/foo@bar\n'
+ c = Client(test_app, response_wrapper=BaseResponse)
+ resp = c.get('/foo%40bar', 'http://localhost/bar%40baz')
+ assert resp.data == '/foo@bar\n/bar@baz'
+
+ def test_multi_value_submit(self):
+ c = Client(multi_value_post_app, response_wrapper=BaseResponse)
+ data = {
+ 'field': ['val1','val2']
+ }
+ resp = c.post('/', data=data)
+ assert resp.status_code == 200
+ c = Client(multi_value_post_app, response_wrapper=BaseResponse)
+ data = MultiDict({
+ 'field': ['val1','val2']
+ })
+ resp = c.post('/', data=data)
+ assert resp.status_code == 200
+
+ def test_iri_support(self):
+ b = EnvironBuilder(u'/föö-bar', base_url=u'http://☃.net/')
+ assert b.path == '/f%C3%B6%C3%B6-bar'
+ assert b.base_url == 'http://xn--n3h.net/'
+
+ def test_run_wsgi_apps(self):
+ def simple_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return ['Hello World!']
+ app_iter, status, headers = run_wsgi_app(simple_app, {})
+ assert status == '200 OK'
+ assert headers == [('Content-Type', 'text/html')]
+ assert app_iter == ['Hello World!']
+
+ def yielding_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ yield 'Hello '
+ yield 'World!'
+ app_iter, status, headers = run_wsgi_app(yielding_app, {})
+ assert status == '200 OK'
+ assert headers == [('Content-Type', 'text/html')]
+ assert list(app_iter) == ['Hello ', 'World!']
+
+ def test_multiple_cookies(self):
+ @Request.application
+ def test_app(request):
+ response = Response(repr(sorted(request.cookies.items())))
+ response.set_cookie('test1', 'foo')
+ response.set_cookie('test2', 'bar')
+ return response
+ client = Client(test_app, Response)
+ resp = client.get('/')
+ assert resp.data == '[]'
+ resp = client.get('/')
+ assert resp.data == "[('test1', u'foo'), ('test2', u'bar')]"
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/urls.py b/websdk/werkzeug/testsuite/urls.py
new file mode 100644
index 0000000..8c908b8
--- /dev/null
+++ b/websdk/werkzeug/testsuite/urls.py
@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.urls
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ URL helper tests.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+import unittest
+from StringIO import StringIO
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.datastructures import OrderedMultiDict
+from werkzeug import urls
+
+
+class URLsTestCase(WerkzeugTestCase):
+
+ def test_quoting(self):
+ assert urls.url_quote(u'\xf6\xe4\xfc') == '%C3%B6%C3%A4%C3%BC'
+ assert urls.url_unquote(urls.url_quote(u'#%="\xf6')) == u'#%="\xf6'
+ assert urls.url_quote_plus('foo bar') == 'foo+bar'
+ assert urls.url_unquote_plus('foo+bar') == 'foo bar'
+ assert urls.url_encode({'a': None, 'b': 'foo bar'}) == 'b=foo+bar'
+ assert urls.url_fix(u'http://de.wikipedia.org/wiki/Elf (Begriffsklärung)') == \
+ 'http://de.wikipedia.org/wiki/Elf%20%28Begriffskl%C3%A4rung%29'
+
+ def test_url_decoding(self):
+ x = urls.url_decode('foo=42&bar=23&uni=H%C3%A4nsel')
+ assert x['foo'] == '42'
+ assert x['bar'] == '23'
+ assert x['uni'] == u'Hänsel'
+
+ x = urls.url_decode('foo=42;bar=23;uni=H%C3%A4nsel', separator=';')
+ assert x['foo'] == '42'
+ assert x['bar'] == '23'
+ assert x['uni'] == u'Hänsel'
+
+ x = urls.url_decode('%C3%9Ch=H%C3%A4nsel', decode_keys=True)
+ assert x[u'Üh'] == u'Hänsel'
+
+ def test_streamed_url_decoding(self):
+ item1 = 'a' * 100000
+ item2 = 'b' * 400
+ string = 'a=%s&b=%s&c=%s' % (item1, item2, item2)
+ gen = urls.url_decode_stream(StringIO(string), limit=len(string),
+ return_iterator=True)
+ self.assert_equal(gen.next(), ('a', item1))
+ self.assert_equal(gen.next(), ('b', item2))
+ self.assert_equal(gen.next(), ('c', item2))
+ self.assert_raises(StopIteration, gen.next)
+
+ def test_url_encoding(self):
+ assert urls.url_encode({'foo': 'bar 45'}) == 'foo=bar+45'
+ d = {'foo': 1, 'bar': 23, 'blah': u'Hänsel'}
+ assert urls.url_encode(d, sort=True) == 'bar=23&blah=H%C3%A4nsel&foo=1'
+ assert urls.url_encode(d, sort=True, separator=';') == 'bar=23;blah=H%C3%A4nsel;foo=1'
+
+ def test_sorted_url_encode(self):
+ assert urls.url_encode({"a": 42, "b": 23, 1: 1, 2: 2}, sort=True) == '1=1&2=2&a=42&b=23'
+ assert urls.url_encode({'A': 1, 'a': 2, 'B': 3, 'b': 4}, sort=True,
+ key=lambda x: x[0].lower()) == 'A=1&a=2&B=3&b=4'
+
+ def test_streamed_url_encoding(self):
+ out = StringIO()
+ urls.url_encode_stream({'foo': 'bar 45'}, out)
+ self.assert_equal(out.getvalue(), 'foo=bar+45')
+
+ d = {'foo': 1, 'bar': 23, 'blah': u'Hänsel'}
+ out = StringIO()
+ urls.url_encode_stream(d, out, sort=True)
+ self.assert_equal(out.getvalue(), 'bar=23&blah=H%C3%A4nsel&foo=1')
+ out = StringIO()
+ urls.url_encode_stream(d, out, sort=True, separator=';')
+ self.assert_equal(out.getvalue(), 'bar=23;blah=H%C3%A4nsel;foo=1')
+
+ gen = urls.url_encode_stream(d, sort=True)
+ self.assert_equal(gen.next(), 'bar=23')
+ self.assert_equal(gen.next(), 'blah=H%C3%A4nsel')
+ self.assert_equal(gen.next(), 'foo=1')
+ self.assert_raises(StopIteration, gen.next)
+
+ def test_url_fixing(self):
+ x = urls.url_fix(u'http://de.wikipedia.org/wiki/Elf (Begriffskl\xe4rung)')
+ assert x == 'http://de.wikipedia.org/wiki/Elf%20%28Begriffskl%C3%A4rung%29'
+
+ x = urls.url_fix('http://example.com/?foo=%2f%2f')
+ assert x == 'http://example.com/?foo=%2f%2f'
+
+ def test_iri_support(self):
+ self.assert_raises(UnicodeError, urls.uri_to_iri, u'http://föö.com/')
+ self.assert_raises(UnicodeError, urls.iri_to_uri, 'http://föö.com/')
+ assert urls.uri_to_iri('http://xn--n3h.net/') == u'http://\u2603.net/'
+ assert urls.uri_to_iri('http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th') == \
+ u'http://\xfcser:p\xe4ssword@\u2603.net/p\xe5th'
+ assert urls.iri_to_uri(u'http://☃.net/') == 'http://xn--n3h.net/'
+ assert urls.iri_to_uri(u'http://üser:pässword@☃.net/påth') == \
+ 'http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th'
+
+ assert urls.uri_to_iri('http://test.com/%3Fmeh?foo=%26%2F') == \
+ u'http://test.com/%3Fmeh?foo=%26%2F'
+
+ # this should work as well, might break on 2.4 because of a broken
+ # idna codec
+ assert urls.uri_to_iri('/foo') == u'/foo'
+ assert urls.iri_to_uri(u'/foo') == '/foo'
+
+ def test_ordered_multidict_encoding(self):
+ d = OrderedMultiDict()
+ d.add('foo', 1)
+ d.add('foo', 2)
+ d.add('foo', 3)
+ d.add('bar', 0)
+ d.add('foo', 4)
+ assert urls.url_encode(d) == 'foo=1&foo=2&foo=3&bar=0&foo=4'
+
+ def test_href(self):
+ x = urls.Href('http://www.example.com/')
+ assert x('foo') == 'http://www.example.com/foo'
+ assert x.foo('bar') == 'http://www.example.com/foo/bar'
+ assert x.foo('bar', x=42) == 'http://www.example.com/foo/bar?x=42'
+ assert x.foo('bar', class_=42) == 'http://www.example.com/foo/bar?class=42'
+ assert x.foo('bar', {'class': 42}) == 'http://www.example.com/foo/bar?class=42'
+ self.assert_raises(AttributeError, lambda: x.__blah__)
+
+ x = urls.Href('blah')
+ assert x.foo('bar') == 'blah/foo/bar'
+
+ self.assert_raises(TypeError, x.foo, {"foo": 23}, x=42)
+
+ x = urls.Href('')
+ assert x('foo') == 'foo'
+
+ def test_href_url_join(self):
+ x = urls.Href('test')
+ assert x('foo:bar') == 'test/foo:bar'
+ assert x('http://example.com/') == 'test/http://example.com/'
+
+ if 0:
+ # stdlib bug? :(
+ def test_href_past_root(self):
+ base_href = urls.Href('http://www.blagga.com/1/2/3')
+ assert base_href('../foo') == 'http://www.blagga.com/1/2/foo'
+ assert base_href('../../foo') == 'http://www.blagga.com/1/foo'
+ assert base_href('../../../foo') == 'http://www.blagga.com/foo'
+ assert base_href('../../../../foo') == 'http://www.blagga.com/foo'
+ assert base_href('../../../../../foo') == 'http://www.blagga.com/foo'
+ assert base_href('../../../../../../foo') == 'http://www.blagga.com/foo'
+
+ def test_url_unquote_plus_unicode(self):
+ # was broken in 0.6
+ assert urls.url_unquote_plus(u'\x6d') == u'\x6d'
+ assert type(urls.url_unquote_plus(u'\x6d')) is unicode
+
+ def test_quoting_of_local_urls(self):
+ rv = urls.iri_to_uri(u'/foo\x8f')
+ assert rv == '/foo%C2%8F'
+ assert type(rv) is str
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(URLsTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/utils.py b/websdk/werkzeug/testsuite/utils.py
new file mode 100644
index 0000000..5ef953c
--- /dev/null
+++ b/websdk/werkzeug/testsuite/utils.py
@@ -0,0 +1,255 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.utils
+ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ General utilities.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+from datetime import datetime
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import utils
+from werkzeug.datastructures import Headers
+from werkzeug.http import parse_date, http_date
+from werkzeug.wrappers import BaseResponse
+from werkzeug.test import Client, run_wsgi_app
+
+
+class GeneralUtilityTestCase(WerkzeugTestCase):
+
+ def test_redirect(self):
+ resp = utils.redirect(u'/füübär')
+ assert '/f%C3%BC%C3%BCb%C3%A4r' in resp.data
+ assert resp.headers['Location'] == '/f%C3%BC%C3%BCb%C3%A4r'
+ assert resp.status_code == 302
+
+ resp = utils.redirect(u'http://☃.net/', 307)
+ assert 'http://xn--n3h.net/' in resp.data
+ assert resp.headers['Location'] == 'http://xn--n3h.net/'
+ assert resp.status_code == 307
+
+ resp = utils.redirect('http://example.com/', 305)
+ assert resp.headers['Location'] == 'http://example.com/'
+ assert resp.status_code == 305
+
+ def test_cached_property(self):
+ foo = []
+ class A(object):
+ def prop(self):
+ foo.append(42)
+ return 42
+ prop = utils.cached_property(prop)
+
+ a = A()
+ p = a.prop
+ q = a.prop
+ assert p == q == 42
+ assert foo == [42]
+
+ foo = []
+ class A(object):
+ def _prop(self):
+ foo.append(42)
+ return 42
+ prop = utils.cached_property(_prop, name='prop')
+ del _prop
+
+ a = A()
+ p = a.prop
+ q = a.prop
+ assert p == q == 42
+ assert foo == [42]
+
+ def test_environ_property(self):
+ class A(object):
+ environ = {'string': 'abc', 'number': '42'}
+
+ string = utils.environ_property('string')
+ missing = utils.environ_property('missing', 'spam')
+ read_only = utils.environ_property('number')
+ number = utils.environ_property('number', load_func=int)
+ broken_number = utils.environ_property('broken_number', load_func=int)
+ date = utils.environ_property('date', None, parse_date, http_date,
+ read_only=False)
+ foo = utils.environ_property('foo')
+
+ a = A()
+ assert a.string == 'abc'
+ assert a.missing == 'spam'
+ def test_assign():
+ a.read_only = 'something'
+ self.assert_raises(AttributeError, test_assign)
+ assert a.number == 42
+ assert a.broken_number == None
+ assert a.date is None
+ a.date = datetime(2008, 1, 22, 10, 0, 0, 0)
+ assert a.environ['date'] == 'Tue, 22 Jan 2008 10:00:00 GMT'
+
+ def test_escape(self):
+ class Foo(str):
+ def __html__(self):
+ return unicode(self)
+ assert utils.escape(None) == ''
+ assert utils.escape(42) == '42'
+ assert utils.escape('<>') == '&lt;&gt;'
+ assert utils.escape('"foo"') == '"foo"'
+ assert utils.escape('"foo"', True) == '&quot;foo&quot;'
+ assert utils.escape(Foo('<foo>')) == '<foo>'
+
+ def test_unescape(self):
+ assert utils.unescape('&lt;&auml;&gt;') == u'<ä>'
+
+ def test_run_wsgi_app(self):
+ def foo(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ yield '1'
+ yield '2'
+ yield '3'
+
+ app_iter, status, headers = run_wsgi_app(foo, {})
+ assert status == '200 OK'
+ assert headers == [('Content-Type', 'text/plain')]
+ assert app_iter.next() == '1'
+ assert app_iter.next() == '2'
+ assert app_iter.next() == '3'
+ self.assert_raises(StopIteration, app_iter.next)
+
+ got_close = []
+ class CloseIter(object):
+ def __init__(self):
+ self.iterated = False
+ def __iter__(self):
+ return self
+ def close(self):
+ got_close.append(None)
+ def next(self):
+ if self.iterated:
+ raise StopIteration()
+ self.iterated = True
+ return 'bar'
+
+ def bar(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return CloseIter()
+
+ app_iter, status, headers = run_wsgi_app(bar, {})
+ assert status == '200 OK'
+ assert headers == [('Content-Type', 'text/plain')]
+ assert app_iter.next() == 'bar'
+ self.assert_raises(StopIteration, app_iter.next)
+ app_iter.close()
+
+ assert run_wsgi_app(bar, {}, True)[0] == ['bar']
+
+ assert len(got_close) == 2
+
+ def test_import_string(self):
+ import cgi
+ from werkzeug.debug import DebuggedApplication
+ assert utils.import_string('cgi.escape') is cgi.escape
+ assert utils.import_string(u'cgi.escape') is cgi.escape
+ assert utils.import_string('cgi:escape') is cgi.escape
+ assert utils.import_string('XXXXXXXXXXXX', True) is None
+ assert utils.import_string('cgi.XXXXXXXXXXXX', True) is None
+ assert utils.import_string(u'cgi.escape') is cgi.escape
+ assert utils.import_string(u'werkzeug.debug.DebuggedApplication') is DebuggedApplication
+ self.assert_raises(ImportError, utils.import_string, 'XXXXXXXXXXXXXXXX')
+ self.assert_raises(ImportError, utils.import_string, 'cgi.XXXXXXXXXX')
+
+ def test_find_modules(self):
+ assert list(utils.find_modules('werkzeug.debug')) == \
+ ['werkzeug.debug.console', 'werkzeug.debug.repr',
+ 'werkzeug.debug.tbtools']
+
+ def test_html_builder(self):
+ html = utils.html
+ xhtml = utils.xhtml
+ assert html.p('Hello World') == '<p>Hello World</p>'
+ assert html.a('Test', href='#') == '<a href="#">Test</a>'
+ assert html.br() == '<br>'
+ assert xhtml.br() == '<br />'
+ assert html.img(src='foo') == '<img src="foo">'
+ assert xhtml.img(src='foo') == '<img src="foo" />'
+ assert html.html(
+ html.head(
+ html.title('foo'),
+ html.script(type='text/javascript')
+ )
+ ) == '<html><head><title>foo</title><script type="text/javascript">' \
+ '</script></head></html>'
+ assert html('<foo>') == '&lt;foo&gt;'
+ assert html.input(disabled=True) == '<input disabled>'
+ assert xhtml.input(disabled=True) == '<input disabled="disabled" />'
+ assert html.input(disabled='') == '<input>'
+ assert xhtml.input(disabled='') == '<input />'
+ assert html.input(disabled=None) == '<input>'
+ assert xhtml.input(disabled=None) == '<input />'
+ assert html.script('alert("Hello World");') == '<script>' \
+ 'alert("Hello World");</script>'
+ assert xhtml.script('alert("Hello World");') == '<script>' \
+ '/*<![CDATA[*/alert("Hello World");/*]]>*/</script>'
+
+ def test_validate_arguments(self):
+ take_none = lambda: None
+ take_two = lambda a, b: None
+ take_two_one_default = lambda a, b=0: None
+
+ assert utils.validate_arguments(take_two, (1, 2,), {}) == ((1, 2), {})
+ assert utils.validate_arguments(take_two, (1,), {'b': 2}) == ((1, 2), {})
+ assert utils.validate_arguments(take_two_one_default, (1,), {}) == ((1, 0), {})
+ assert utils.validate_arguments(take_two_one_default, (1, 2), {}) == ((1, 2), {})
+
+ self.assert_raises(utils.ArgumentValidationError,
+ utils.validate_arguments, take_two, (), {})
+
+ assert utils.validate_arguments(take_none, (1, 2,), {'c': 3}) == ((), {})
+ self.assert_raises(utils.ArgumentValidationError,
+ utils.validate_arguments, take_none, (1,), {}, drop_extra=False)
+ self.assert_raises(utils.ArgumentValidationError,
+ utils.validate_arguments, take_none, (), {'a': 1}, drop_extra=False)
+
+ def test_header_set_duplication_bug(self):
+ headers = Headers([
+ ('Content-Type', 'text/html'),
+ ('Foo', 'bar'),
+ ('Blub', 'blah')
+ ])
+ headers['blub'] = 'hehe'
+ headers['blafasel'] = 'humm'
+ assert headers == Headers([
+ ('Content-Type', 'text/html'),
+ ('Foo', 'bar'),
+ ('blub', 'hehe'),
+ ('blafasel', 'humm')
+ ])
+
+ def test_append_slash_redirect(self):
+ def app(env, sr):
+ return utils.append_slash_redirect(env)(env, sr)
+ client = Client(app, BaseResponse)
+ response = client.get('foo', base_url='http://example.org/app')
+ assert response.status_code == 301
+ assert response.headers['Location'] == 'http://example.org/app/foo/'
+
+ def test_cached_property_doc(self):
+ @utils.cached_property
+ def foo():
+ """testing"""
+ return 42
+ assert foo.__doc__ == 'testing'
+ assert foo.__name__ == 'foo'
+ assert foo.__module__ == __name__
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(GeneralUtilityTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/wrappers.py b/websdk/werkzeug/testsuite/wrappers.py
new file mode 100644
index 0000000..b18f891
--- /dev/null
+++ b/websdk/werkzeug/testsuite/wrappers.py
@@ -0,0 +1,662 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.wrappers
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests for the response and request objects.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+import unittest
+import pickle
+from StringIO import StringIO
+from datetime import datetime
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug import wrappers
+from werkzeug.datastructures import MultiDict, ImmutableOrderedMultiDict, \
+ ImmutableList, ImmutableTypeConversionDict, CharsetAccept, \
+ CombinedMultiDict
+from werkzeug.test import Client, create_environ, run_wsgi_app
+
+
+class RequestTestResponse(wrappers.BaseResponse):
+ """Subclass of the normal response class we use to test response
+ and base classes. Has some methods to test if things in the
+ response match.
+ """
+
+ def __init__(self, response, status, headers):
+ wrappers.BaseResponse.__init__(self, response, status, headers)
+ self.body_data = pickle.loads(self.data)
+
+ def __getitem__(self, key):
+ return self.body_data[key]
+
+
+def request_demo_app(environ, start_response):
+ request = wrappers.BaseRequest(environ)
+ assert 'werkzeug.request' in environ
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [pickle.dumps({
+ 'args': request.args,
+ 'args_as_list': request.args.lists(),
+ 'form': request.form,
+ 'form_as_list': request.form.lists(),
+ 'environ': prepare_environ_pickle(request.environ),
+ 'data': request.data
+ })]
+
+
+def prepare_environ_pickle(environ):
+ result = {}
+ for key, value in environ.iteritems():
+ try:
+ pickle.dumps((key, value))
+ except Exception:
+ continue
+ result[key] = value
+ return result
+
+
+class WrappersTestCase(WerkzeugTestCase):
+
+ def assert_environ(self, environ, method):
+ assert environ['REQUEST_METHOD'] == method
+ assert environ['PATH_INFO'] == '/'
+ assert environ['SCRIPT_NAME'] == ''
+ assert environ['SERVER_NAME'] == 'localhost'
+ assert environ['wsgi.version'] == (1, 0)
+ assert environ['wsgi.url_scheme'] == 'http'
+
+ def test_base_request(self):
+ client = Client(request_demo_app, RequestTestResponse)
+
+ # get requests
+ response = client.get('/?foo=bar&foo=hehe')
+ assert response['args'] == MultiDict([('foo', 'bar'), ('foo', 'hehe')])
+ assert response['args_as_list'] == [('foo', ['bar', 'hehe'])]
+ assert response['form'] == MultiDict()
+ assert response['form_as_list'] == []
+ assert response['data'] == ''
+ self.assert_environ(response['environ'], 'GET')
+
+ # post requests with form data
+ response = client.post('/?blub=blah', data='foo=blub+hehe&blah=42',
+ content_type='application/x-www-form-urlencoded')
+ assert response['args'] == MultiDict([('blub', 'blah')])
+ assert response['args_as_list'] == [('blub', ['blah'])]
+ assert response['form'] == MultiDict([('foo', 'blub hehe'), ('blah', '42')])
+ assert response['data'] == ''
+ # currently we do not guarantee that the values are ordered correctly
+ # for post data.
+ ## assert response['form_as_list'] == [('foo', ['blub hehe']), ('blah', ['42'])]
+ self.assert_environ(response['environ'], 'POST')
+
+ # patch requests with form data
+ response = client.patch('/?blub=blah', data='foo=blub+hehe&blah=42',
+ content_type='application/x-www-form-urlencoded')
+ assert response['args'] == MultiDict([('blub', 'blah')])
+ assert response['args_as_list'] == [('blub', ['blah'])]
+ assert response['form'] == MultiDict([('foo', 'blub hehe'), ('blah', '42')])
+ assert response['data'] == ''
+ self.assert_environ(response['environ'], 'PATCH')
+
+ # post requests with json data
+ json = '{"foo": "bar", "blub": "blah"}'
+ response = client.post('/?a=b', data=json, content_type='application/json')
+ assert response['data'] == json
+ assert response['args'] == MultiDict([('a', 'b')])
+ assert response['form'] == MultiDict()
+
+ def test_access_route(self):
+ req = wrappers.Request.from_values(headers={
+ 'X-Forwarded-For': '192.168.1.2, 192.168.1.1'
+ })
+ req.environ['REMOTE_ADDR'] = '192.168.1.3'
+ assert req.access_route == ['192.168.1.2', '192.168.1.1']
+ assert req.remote_addr == '192.168.1.3'
+
+ req = wrappers.Request.from_values()
+ req.environ['REMOTE_ADDR'] = '192.168.1.3'
+ assert req.access_route == ['192.168.1.3']
+
+ def test_url_request_descriptors(self):
+ req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
+ assert req.path == u'/bar'
+ assert req.script_root == u'/test'
+ assert req.url == 'http://example.com/test/bar?foo=baz'
+ assert req.base_url == 'http://example.com/test/bar'
+ assert req.url_root == 'http://example.com/test/'
+ assert req.host_url == 'http://example.com/'
+ assert req.host == 'example.com'
+ assert req.scheme == 'http'
+
+ req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test')
+ assert req.scheme == 'https'
+
+ def test_authorization_mixin(self):
+ request = wrappers.Request.from_values(headers={
+ 'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
+ })
+ a = request.authorization
+ assert a.type == 'basic'
+ assert a.username == 'Aladdin'
+ assert a.password == 'open sesame'
+
+ def test_base_response(self):
+ # unicode
+ response = wrappers.BaseResponse(u'öäü')
+ assert response.data == 'öäü'
+
+ # writing
+ response = wrappers.Response('foo')
+ response.stream.write('bar')
+ assert response.data == 'foobar'
+
+ # set cookie
+ response = wrappers.BaseResponse()
+ response.set_cookie('foo', 'bar', 60, 0, '/blub', 'example.org', False)
+ assert response.headers.to_list() == [
+ ('Content-Type', 'text/plain; charset=utf-8'),
+ ('Set-Cookie', 'foo=bar; Domain=example.org; expires=Thu, '
+ '01-Jan-1970 00:00:00 GMT; Max-Age=60; Path=/blub')
+ ]
+
+ # delete cookie
+ response = wrappers.BaseResponse()
+ response.delete_cookie('foo')
+ assert response.headers.to_list() == [
+ ('Content-Type', 'text/plain; charset=utf-8'),
+ ('Set-Cookie', 'foo=; expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/')
+ ]
+
+ # close call forwarding
+ closed = []
+ class Iterable(object):
+ def next(self):
+ raise StopIteration()
+ def __iter__(self):
+ return self
+ def close(self):
+ closed.append(True)
+ response = wrappers.BaseResponse(Iterable())
+ response.call_on_close(lambda: closed.append(True))
+ app_iter, status, headers = run_wsgi_app(response,
+ create_environ(),
+ buffered=True)
+ assert status == '200 OK'
+ assert ''.join(app_iter) == ''
+ assert len(closed) == 2
+
+ def test_response_status_codes(self):
+ response = wrappers.BaseResponse()
+ response.status_code = 404
+ assert response.status == '404 NOT FOUND'
+ response.status = '200 OK'
+ assert response.status_code == 200
+ response.status = '999 WTF'
+ assert response.status_code == 999
+ response.status_code = 588
+ assert response.status_code == 588
+ assert response.status == '588 UNKNOWN'
+ response.status = 'wtf'
+ assert response.status_code == 0
+
+ def test_type_forcing(self):
+ def wsgi_application(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return ['Hello World!']
+ base_response = wrappers.BaseResponse('Hello World!', content_type='text/html')
+
+ class SpecialResponse(wrappers.Response):
+ def foo(self):
+ return 42
+
+ # good enough for this simple application, but don't ever use that in
+ # real world examples!
+ fake_env = {}
+
+ for orig_resp in wsgi_application, base_response:
+ response = SpecialResponse.force_type(orig_resp, fake_env)
+ assert response.__class__ is SpecialResponse
+ assert response.foo() == 42
+ assert response.data == 'Hello World!'
+ assert response.content_type == 'text/html'
+
+ # without env, no arbitrary conversion
+ self.assert_raises(TypeError, SpecialResponse.force_type, wsgi_application)
+
+ def test_accept_mixin(self):
+ request = wrappers.Request({
+ 'HTTP_ACCEPT': 'text/xml,application/xml,application/xhtml+xml,'
+ 'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5',
+ 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
+ 'HTTP_ACCEPT_ENCODING': 'gzip,deflate',
+ 'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5'
+ })
+ assert request.accept_mimetypes == CharsetAccept([
+ ('text/xml', 1), ('image/png', 1), ('application/xml', 1),
+ ('application/xhtml+xml', 1), ('text/html', 0.9),
+ ('text/plain', 0.8), ('*/*', 0.5)
+ ])
+ assert request.accept_charsets == CharsetAccept([
+ ('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7)
+ ])
+ assert request.accept_encodings == CharsetAccept([('gzip', 1), ('deflate', 1)])
+ assert request.accept_languages == CharsetAccept([('en-us', 1), ('en', 0.5)])
+
+ request = wrappers.Request({'HTTP_ACCEPT': ''})
+ assert request.accept_mimetypes == CharsetAccept()
+
+ def test_etag_request_mixin(self):
+ request = wrappers.Request({
+ 'HTTP_CACHE_CONTROL': 'no-store, no-cache',
+ 'HTTP_IF_MATCH': 'w/"foo", bar, "baz"',
+ 'HTTP_IF_NONE_MATCH': 'w/"foo", bar, "baz"',
+ 'HTTP_IF_MODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT',
+ 'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT'
+ })
+ assert request.cache_control.no_store
+ assert request.cache_control.no_cache
+
+ for etags in request.if_match, request.if_none_match:
+ assert etags('bar')
+ assert etags.contains_raw('w/"foo"')
+ assert etags.contains_weak('foo')
+ assert not etags.contains('foo')
+
+ assert request.if_modified_since == datetime(2008, 1, 22, 11, 18, 44)
+ assert request.if_unmodified_since == datetime(2008, 1, 22, 11, 18, 44)
+
+ def test_user_agent_mixin(self):
+ user_agents = [
+ ('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) '
+ 'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11',
+ 'en-US'),
+ ('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54',
+ 'opera', 'windows', '8.54', 'de-DE'),
+ ('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 '
+ '(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3',
+ 'safari', 'iphone', '419.3', 'en'),
+ ('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)',
+ 'google', None, '2.1', None)
+ ]
+ for ua, browser, platform, version, lang in user_agents:
+ request = wrappers.Request({'HTTP_USER_AGENT': ua})
+ assert request.user_agent.browser == browser
+ assert request.user_agent.platform == platform
+ assert request.user_agent.version == version
+ assert request.user_agent.language == lang
+ assert bool(request.user_agent)
+ assert request.user_agent.to_header() == ua
+ assert str(request.user_agent) == ua
+
+ request = wrappers.Request({'HTTP_USER_AGENT': 'foo'})
+ assert not request.user_agent
+
+ def test_etag_response_mixin(self):
+ response = wrappers.Response('Hello World')
+ assert response.get_etag() == (None, None)
+ response.add_etag()
+ assert response.get_etag() == ('b10a8db164e0754105b7a99be72e3fe5', False)
+ assert not response.cache_control
+ response.cache_control.must_revalidate = True
+ response.cache_control.max_age = 60
+ response.headers['Content-Length'] = len(response.data)
+ assert response.headers['Cache-Control'] == 'must-revalidate, max-age=60'
+
+ assert 'date' not in response.headers
+ env = create_environ()
+ env.update({
+ 'REQUEST_METHOD': 'GET',
+ 'HTTP_IF_NONE_MATCH': response.get_etag()[0]
+ })
+ response.make_conditional(env)
+ assert 'date' in response.headers
+
+ # after the thing is invoked by the server as wsgi application
+ # (we're emulating this here), there must not be any entity
+ # headers left and the status code would have to be 304
+ resp = wrappers.Response.from_app(response, env)
+ assert resp.status_code == 304
+ assert not 'content-length' in resp.headers
+
+ # make sure date is not overriden
+ response = wrappers.Response('Hello World')
+ response.date = 1337
+ d = response.date
+ response.make_conditional(env)
+ assert response.date == d
+
+ # make sure content length is only set if missing
+ response = wrappers.Response('Hello World')
+ response.content_length = 999
+ response.make_conditional(env)
+ self.assert_equal(response.content_length, 999)
+
+ def test_etag_response_mixin_freezing(self):
+ class WithFreeze(wrappers.ETagResponseMixin, wrappers.BaseResponse):
+ pass
+ class WithoutFreeze(wrappers.BaseResponse, wrappers.ETagResponseMixin):
+ pass
+
+ response = WithFreeze('Hello World')
+ response.freeze()
+ assert response.get_etag() == (wrappers.generate_etag('Hello World'), False)
+ response = WithoutFreeze('Hello World')
+ response.freeze()
+ assert response.get_etag() == (None, None)
+ response = wrappers.Response('Hello World')
+ response.freeze()
+ assert response.get_etag() == (None, None)
+
+ def test_authenticate_mixin(self):
+ resp = wrappers.Response()
+ resp.www_authenticate.type = 'basic'
+ resp.www_authenticate.realm = 'Testing'
+ assert resp.headers['WWW-Authenticate'] == 'Basic realm="Testing"'
+ resp.www_authenticate.realm = None
+ resp.www_authenticate.type = None
+ assert 'WWW-Authenticate' not in resp.headers
+
+ def test_response_stream_mixin(self):
+ response = wrappers.Response()
+ response.stream.write('Hello ')
+ response.stream.write('World!')
+ assert response.response == ['Hello ', 'World!']
+ assert response.data == 'Hello World!'
+
+ def test_common_response_descriptors_mixin(self):
+ response = wrappers.Response()
+ response.mimetype = 'text/html'
+ assert response.mimetype == 'text/html'
+ assert response.content_type == 'text/html; charset=utf-8'
+ assert response.mimetype_params == {'charset': 'utf-8'}
+ response.mimetype_params['x-foo'] = 'yep'
+ del response.mimetype_params['charset']
+ assert response.content_type == 'text/html; x-foo=yep'
+
+ now = datetime.utcnow().replace(microsecond=0)
+
+ assert response.content_length is None
+ response.content_length = '42'
+ assert response.content_length == 42
+
+ for attr in 'date', 'age', 'expires':
+ assert getattr(response, attr) is None
+ setattr(response, attr, now)
+ assert getattr(response, attr) == now
+
+ assert response.retry_after is None
+ response.retry_after = now
+ assert response.retry_after == now
+
+ assert not response.vary
+ response.vary.add('Cookie')
+ response.vary.add('Content-Language')
+ assert 'cookie' in response.vary
+ assert response.vary.to_header() == 'Cookie, Content-Language'
+ response.headers['Vary'] = 'Content-Encoding'
+ assert response.vary.as_set() == set(['content-encoding'])
+
+ response.allow.update(['GET', 'POST'])
+ assert response.headers['Allow'] == 'GET, POST'
+
+ response.content_language.add('en-US')
+ response.content_language.add('fr')
+ assert response.headers['Content-Language'] == 'en-US, fr'
+
+
+ def test_common_request_descriptors_mixin(self):
+ request = wrappers.Request.from_values(content_type='text/html; charset=utf-8',
+ content_length='23',
+ headers={
+ 'Referer': 'http://www.example.com/',
+ 'Date': 'Sat, 28 Feb 2009 19:04:35 GMT',
+ 'Max-Forwards': '10',
+ 'Pragma': 'no-cache'
+ })
+
+ assert request.content_type == 'text/html; charset=utf-8'
+ assert request.mimetype == 'text/html'
+ assert request.mimetype_params == {'charset': 'utf-8'}
+ assert request.content_length == 23
+ assert request.referrer == 'http://www.example.com/'
+ assert request.date == datetime(2009, 2, 28, 19, 4, 35)
+ assert request.max_forwards == 10
+ assert 'no-cache' in request.pragma
+
+ def test_shallow_mode(self):
+ """Request object shallow mode"""
+ request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True)
+ assert request.args['foo'] == 'bar'
+ self.assert_raises(RuntimeError, lambda: request.form['foo'])
+
+ def test_form_parsing_failed(self):
+ data = (
+ '--blah\r\n'
+ )
+ data = wrappers.Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ assert not data.files
+ assert not data.form
+
+ def test_url_charset_reflection(self):
+ req = wrappers.Request.from_values()
+ req.charset = 'utf-7'
+ assert req.url_charset == 'utf-7'
+
+ def test_response_streamed(self):
+ r = wrappers.Response()
+ assert not r.is_streamed
+ r = wrappers.Response("Hello World")
+ assert not r.is_streamed
+ r = wrappers.Response(["foo", "bar"])
+ assert not r.is_streamed
+ def gen():
+ if 0:
+ yield None
+ r = wrappers.Response(gen())
+ assert r.is_streamed
+
+ def test_response_freeze(self):
+ def generate():
+ yield "foo"
+ yield "bar"
+ resp = wrappers.Response(generate())
+ resp.freeze()
+ assert resp.response == ['foo', 'bar']
+ assert resp.headers['content-length'] == '6'
+
+ def test_other_method_payload(self):
+ data = 'Hello World'
+ req = wrappers.Request.from_values(input_stream=StringIO(data),
+ content_length=len(data),
+ content_type='text/plain',
+ method='WHAT_THE_FUCK')
+ assert req.data == data
+ assert isinstance(req.stream, wrappers.LimitedStream)
+
+ def test_urlfication(self):
+ resp = wrappers.Response()
+ resp.headers['Location'] = u'http://üser:pässword@☃.net/påth'
+ resp.headers['Content-Location'] = u'http://☃.net/'
+ headers = resp.get_wsgi_headers(create_environ())
+ assert headers['location'] == \
+ 'http://%C3%BCser:p%C3%A4ssword@xn--n3h.net/p%C3%A5th'
+ assert headers['content-location'] == 'http://xn--n3h.net/'
+
+ def test_new_response_iterator_behavior(self):
+ req = wrappers.Request.from_values()
+ resp = wrappers.Response(u'Hello Wörld!')
+
+ def get_content_length(resp):
+ headers = wrappers.Headers.linked(resp.get_wsgi_headers(req.environ))
+ return headers.get('content-length', type=int)
+
+ def generate_items():
+ yield "Hello "
+ yield u"Wörld!"
+
+ # werkzeug encodes when set to `data` now, which happens
+ # if a string is passed to the response object.
+ assert resp.response == [u'Hello Wörld!'.encode('utf-8')]
+ assert resp.data == u'Hello Wörld!'.encode('utf-8')
+ assert get_content_length(resp) == 13
+ assert not resp.is_streamed
+ assert resp.is_sequence
+
+ # try the same for manual assignment
+ resp.data = u'Wörd'
+ assert resp.response == [u'Wörd'.encode('utf-8')]
+ assert resp.data == u'Wörd'.encode('utf-8')
+ assert get_content_length(resp) == 5
+ assert not resp.is_streamed
+ assert resp.is_sequence
+
+ # automatic generator sequence conversion
+ resp.response = generate_items()
+ assert resp.is_streamed
+ assert not resp.is_sequence
+ assert resp.data == u'Hello Wörld!'.encode('utf-8')
+ assert resp.response == ['Hello ', u'Wörld!'.encode('utf-8')]
+ assert not resp.is_streamed
+ assert resp.is_sequence
+
+ # automatic generator sequence conversion
+ resp.response = generate_items()
+ resp.implicit_sequence_conversion = False
+ assert resp.is_streamed
+ assert not resp.is_sequence
+ self.assert_raises(RuntimeError, lambda: resp.data)
+ resp.make_sequence()
+ assert resp.data == u'Hello Wörld!'.encode('utf-8')
+ assert resp.response == ['Hello ', u'Wörld!'.encode('utf-8')]
+ assert not resp.is_streamed
+ assert resp.is_sequence
+
+ # stream makes it a list no matter how the conversion is set
+ for val in True, False:
+ resp.implicit_sequence_conversion = val
+ resp.response = ("foo", "bar")
+ assert resp.is_sequence
+ resp.stream.write('baz')
+ assert resp.response == ['foo', 'bar', 'baz']
+
+ def test_form_data_ordering(self):
+ class MyRequest(wrappers.Request):
+ parameter_storage_class = ImmutableOrderedMultiDict
+
+ req = MyRequest.from_values('/?foo=1&bar=0&foo=3')
+ assert list(req.args) == ['foo', 'bar']
+ assert req.args.items(multi=True) == [
+ ('foo', '1'),
+ ('bar', '0'),
+ ('foo', '3')
+ ]
+ assert isinstance(req.args, ImmutableOrderedMultiDict)
+ assert isinstance(req.values, CombinedMultiDict)
+ assert req.values['foo'] == '1'
+ assert req.values.getlist('foo') == ['1', '3']
+
+ def test_storage_classes(self):
+ class MyRequest(wrappers.Request):
+ dict_storage_class = dict
+ list_storage_class = list
+ parameter_storage_class = dict
+ req = MyRequest.from_values('/?foo=baz', headers={
+ 'Cookie': 'foo=bar'
+ })
+ assert type(req.cookies) is dict
+ assert req.cookies == {'foo': 'bar'}
+ assert type(req.access_route) is list
+
+ assert type(req.args) is dict
+ assert type(req.values) is CombinedMultiDict
+ assert req.values['foo'] == 'baz'
+
+ req = wrappers.Request.from_values(headers={
+ 'Cookie': 'foo=bar'
+ })
+ assert type(req.cookies) is ImmutableTypeConversionDict
+ assert req.cookies == {'foo': 'bar'}
+ assert type(req.access_route) is ImmutableList
+
+ MyRequest.list_storage_class = tuple
+ req = MyRequest.from_values()
+ assert type(req.access_route) is tuple
+
+ def test_response_headers_passthrough(self):
+ headers = wrappers.Headers()
+ resp = wrappers.Response(headers=headers)
+ assert resp.headers is headers
+
+ def test_response_304_no_content_length(self):
+ resp = wrappers.Response('Test', status=304)
+ env = create_environ()
+ assert 'content-length' not in resp.get_wsgi_headers(env)
+
+ def test_ranges(self):
+ # basic range stuff
+ req = wrappers.Request.from_values()
+ assert req.range is None
+ req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'})
+ assert req.range.ranges == [(0, 500)]
+
+ resp = wrappers.Response()
+ resp.content_range = req.range.make_content_range(1000)
+ assert resp.content_range.units == 'bytes'
+ assert resp.content_range.start == 0
+ assert resp.content_range.stop == 500
+ assert resp.content_range.length == 1000
+ assert resp.headers['Content-Range'] == 'bytes 0-499/1000'
+
+ resp.content_range.unset()
+ assert 'Content-Range' not in resp.headers
+
+ resp.headers['Content-Range'] = 'bytes 0-499/1000'
+ assert resp.content_range.units == 'bytes'
+ assert resp.content_range.start == 0
+ assert resp.content_range.stop == 500
+ assert resp.content_range.length == 1000
+
+ def test_auto_content_length(self):
+ resp = wrappers.Response('Hello World!')
+ assert resp.content_length == 12
+
+ resp = wrappers.Response(['Hello World!'])
+ assert resp.content_length is None
+ assert resp.get_wsgi_headers({})['Content-Length'] == '12'
+
+ def test_disabled_auto_content_length(self):
+ class MyResponse(wrappers.Response):
+ automatically_set_content_length = False
+ resp = MyResponse('Hello World!')
+ self.assert_(resp.content_length is None)
+
+ resp = MyResponse(['Hello World!'])
+ self.assert_(resp.content_length is None)
+ self.assert_('Content-Length' not in resp.get_wsgi_headers({}))
+
+ def test_location_header_autocorrect(self):
+ env = create_environ()
+ class MyResponse(wrappers.Response):
+ autocorrect_location_header = False
+ resp = MyResponse('Hello World!')
+ resp.headers['Location'] = '/test'
+ self.assert_equal(resp.get_wsgi_headers(env)['Location'], '/test')
+
+ resp = wrappers.Response('Hello World!')
+ resp.headers['Location'] = '/test'
+ self.assert_equal(resp.get_wsgi_headers(env)['Location'], 'http://localhost/test')
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(WrappersTestCase))
+ return suite
diff --git a/websdk/werkzeug/testsuite/wsgi.py b/websdk/werkzeug/testsuite/wsgi.py
new file mode 100644
index 0000000..6602fa2
--- /dev/null
+++ b/websdk/werkzeug/testsuite/wsgi.py
@@ -0,0 +1,217 @@
+# -*- coding: utf-8 -*-
+"""
+ werkzeug.testsuite.wsgi
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ Tests the WSGI utilities.
+
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+"""
+
+from __future__ import with_statement
+
+import unittest
+from os import path
+from cStringIO import StringIO
+
+from werkzeug.testsuite import WerkzeugTestCase
+
+from werkzeug.wrappers import BaseResponse
+from werkzeug.exceptions import BadRequest, ClientDisconnected
+from werkzeug.test import Client, create_environ, run_wsgi_app
+from werkzeug import wsgi
+
+
+class WSGIUtilsTestCase(WerkzeugTestCase):
+
+ def test_shareddatamiddleware_get_file_loader(self):
+ app = wsgi.SharedDataMiddleware(None, {})
+ assert callable(app.get_file_loader('foo'))
+
+ def test_shared_data_middleware(self):
+ def null_application(environ, start_response):
+ start_response('404 NOT FOUND', [('Content-Type', 'text/plain')])
+ yield 'NOT FOUND'
+ app = wsgi.SharedDataMiddleware(null_application, {
+ '/': path.join(path.dirname(__file__), 'res'),
+ '/sources': path.join(path.dirname(__file__), 'res'),
+ '/pkg': ('werkzeug.debug', 'shared')
+ })
+
+ for p in '/test.txt', '/sources/test.txt':
+ app_iter, status, headers = run_wsgi_app(app, create_environ(p))
+ assert status == '200 OK'
+ assert ''.join(app_iter).strip() == 'FOUND'
+
+ app_iter, status, headers = run_wsgi_app(app, create_environ('/pkg/debugger.js'))
+ contents = ''.join(app_iter)
+ assert '$(function() {' in contents
+
+ app_iter, status, headers = run_wsgi_app(app, create_environ('/missing'))
+ assert status == '404 NOT FOUND'
+ assert ''.join(app_iter).strip() == 'NOT FOUND'
+
+ def test_get_host(self):
+ env = {'HTTP_X_FORWARDED_HOST': 'example.org',
+ 'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
+ assert wsgi.get_host(env) == 'example.org'
+ assert wsgi.get_host(create_environ('/', 'http://example.org')) \
+ == 'example.org'
+
+ def test_responder(self):
+ def foo(environ, start_response):
+ return BaseResponse('Test')
+ client = Client(wsgi.responder(foo), BaseResponse)
+ response = client.get('/')
+ assert response.status_code == 200
+ assert response.data == 'Test'
+
+ def test_pop_path_info(self):
+ original_env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b///c'}
+
+ # regular path info popping
+ def assert_tuple(script_name, path_info):
+ assert env.get('SCRIPT_NAME') == script_name
+ assert env.get('PATH_INFO') == path_info
+ env = original_env.copy()
+ pop = lambda: wsgi.pop_path_info(env)
+
+ assert_tuple('/foo', '/a/b///c')
+ assert pop() == 'a'
+ assert_tuple('/foo/a', '/b///c')
+ assert pop() == 'b'
+ assert_tuple('/foo/a/b', '///c')
+ assert pop() == 'c'
+ assert_tuple('/foo/a/b///c', '')
+ assert pop() is None
+
+ def test_peek_path_info(self):
+ env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/aaa/b///c'}
+
+ assert wsgi.peek_path_info(env) == 'aaa'
+ assert wsgi.peek_path_info(env) == 'aaa'
+
+ def test_limited_stream(self):
+ class RaisingLimitedStream(wsgi.LimitedStream):
+ def on_exhausted(self):
+ raise BadRequest('input stream exhausted')
+
+ io = StringIO('123456')
+ stream = RaisingLimitedStream(io, 3)
+ assert stream.read() == '123'
+ self.assert_raises(BadRequest, stream.read)
+
+ io = StringIO('123456')
+ stream = RaisingLimitedStream(io, 3)
+ assert stream.read(1) == '1'
+ assert stream.read(1) == '2'
+ assert stream.read(1) == '3'
+ self.assert_raises(BadRequest, stream.read)
+
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readline() == '123456\n'
+ assert stream.readline() == 'ab'
+
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines() == ['123456\n', 'ab']
+
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines(2) == ['12']
+ assert stream.readlines(2) == ['34']
+ assert stream.readlines() == ['56\n', 'ab']
+
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readline(100) == '123456\n'
+
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines(100) == ['123456\n', 'ab']
+
+ io = StringIO('123456')
+ stream = wsgi.LimitedStream(io, 3)
+ assert stream.read(1) == '1'
+ assert stream.read(1) == '2'
+ assert stream.read() == '3'
+ assert stream.read() == ''
+
+ io = StringIO('123456')
+ stream = wsgi.LimitedStream(io, 3)
+ assert stream.read(-1) == '123'
+
+ def test_limited_stream_disconnection(self):
+ io = StringIO('A bit of content')
+
+ # disconnect detection on out of bytes
+ stream = wsgi.LimitedStream(io, 255)
+ with self.assert_raises(ClientDisconnected):
+ stream.read()
+
+ # disconnect detection because file close
+ io = StringIO('x' * 255)
+ io.close()
+ stream = wsgi.LimitedStream(io, 255)
+ with self.assert_raises(ClientDisconnected):
+ stream.read()
+
+ def test_path_info_extraction(self):
+ x = wsgi.extract_path_info('http://example.com/app', '/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app',
+ 'https://example.com/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app')
+ assert x == u'/'
+ x = wsgi.extract_path_info(u'http://☃.net/', u'/fööbär')
+ assert x == u'/fööbär'
+ x = wsgi.extract_path_info(u'http://☃.net/x', u'http://☃.net/x/fööbär')
+ assert x == u'/fööbär'
+
+ env = create_environ(u'/fööbär', u'http://☃.net/x/')
+ x = wsgi.extract_path_info(env, u'http://☃.net/x/fööbär')
+ assert x == u'/fööbär'
+
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/a/hello')
+ assert x is None
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app/hello',
+ collapse_http_schemes=False)
+ assert x is None
+
+ def test_get_host_fallback(self):
+ assert wsgi.get_host({
+ 'SERVER_NAME': 'foobar.example.com',
+ 'wsgi.url_scheme': 'http',
+ 'SERVER_PORT': '80'
+ }) == 'foobar.example.com'
+ assert wsgi.get_host({
+ 'SERVER_NAME': 'foobar.example.com',
+ 'wsgi.url_scheme': 'http',
+ 'SERVER_PORT': '81'
+ }) == 'foobar.example.com:81'
+
+ def test_multi_part_line_breaks(self):
+ data = 'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
+ test_stream = StringIO(data)
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=16))
+ assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n', 'ABCDEFGHIJK']
+
+ data = 'abc\r\nThis line is broken by the buffer length.\r\nFoo bar baz'
+ test_stream = StringIO(data)
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=24))
+ assert lines == ['abc\r\n', 'This line is broken by the buffer length.\r\n', 'Foo bar baz']
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(WSGIUtilsTestCase))
+ return suite