diff options
Diffstat (limited to 'websdk/werkzeug/testsuite/contrib')
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/__init__.py | 19 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/cache.py | 152 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/fixers.py | 187 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/iterio.py | 72 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/securecookie.py | 65 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/sessions.py | 81 | ||||
-rw-r--r-- | websdk/werkzeug/testsuite/contrib/wrappers.py | 97 |
7 files changed, 673 insertions, 0 deletions
diff --git a/websdk/werkzeug/testsuite/contrib/__init__.py b/websdk/werkzeug/testsuite/contrib/__init__.py new file mode 100644 index 0000000..b36a4ef --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.contrib + ~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the contrib modules. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +from werkzeug.testsuite import iter_suites + + +def suite(): + suite = unittest.TestSuite() + for other_suite in iter_suites(__name__): + suite.addTest(other_suite) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/cache.py b/websdk/werkzeug/testsuite/contrib/cache.py new file mode 100644 index 0000000..209189a --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/cache.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.cache + ~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the cache system + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import os +import time +import unittest +import tempfile +import shutil + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.contrib import cache + +try: + import redis +except ImportError: + redis = None + + + +class SimpleCacheTestCase(WerkzeugTestCase): + + def test_get_dict(self): + c = cache.SimpleCache() + c.set('a', 'a') + c.set('b', 'b') + d = c.get_dict('a', 'b') + assert 'a' in d + assert 'a' == d['a'] + assert 'b' in d + assert 'b' == d['b'] + + def test_set_many(self): + c = cache.SimpleCache() + c.set_many({0: 0, 1: 1, 2: 4}) + assert c.get(2) == 4 + c.set_many((i, i*i) for i in xrange(3)) + assert c.get(2) == 4 + + +class FileSystemCacheTestCase(WerkzeugTestCase): + + def test_set_get(self): + tmp_dir = tempfile.mkdtemp() + try: + c = cache.FileSystemCache(cache_dir=tmp_dir) + for i in range(3): + c.set(str(i), i * i) + for i in range(3): + result = c.get(str(i)) + assert result == i * i + finally: + shutil.rmtree(tmp_dir) + + def test_filesystemcache_prune(self): + THRESHOLD = 13 + tmp_dir = tempfile.mkdtemp() + c = cache.FileSystemCache(cache_dir=tmp_dir, threshold=THRESHOLD) + for i in range(2 * THRESHOLD): + c.set(str(i), i) + cache_files = os.listdir(tmp_dir) + shutil.rmtree(tmp_dir) + assert len(cache_files) <= THRESHOLD + + + def test_filesystemcache_clear(self): + tmp_dir = tempfile.mkdtemp() + c = cache.FileSystemCache(cache_dir=tmp_dir) + c.set('foo', 'bar') + cache_files = os.listdir(tmp_dir) + assert len(cache_files) == 1 + c.clear() + cache_files = os.listdir(tmp_dir) + assert len(cache_files) == 0 + shutil.rmtree(tmp_dir) + + +class RedisCacheTestCase(WerkzeugTestCase): + + def make_cache(self): + return cache.RedisCache(key_prefix='werkzeug-test-case:') + + def teardown(self): + self.make_cache().clear() + + def test_get_set(self): + c = self.make_cache() + c.set('foo', 'bar') + assert c.get('foo') == 'bar' + + def test_get_many(self): + c = self.make_cache() + c.set('foo', 'bar') + c.set('spam', 'eggs') + assert c.get_many('foo', 'spam') == ['bar', 'eggs'] + + def test_set_many(self): + c = self.make_cache() + c.set_many({'foo': 'bar', 'spam': 'eggs'}) + assert c.get('foo') == 'bar' + assert c.get('spam') == 'eggs' + + def test_expire(self): + c = self.make_cache() + c.set('foo', 'bar', 1) + time.sleep(2) + assert c.get('foo') is None + + def test_add(self): + c = self.make_cache() + # sanity check that add() works like set() + c.add('foo', 'bar') + assert c.get('foo') == 'bar' + c.add('foo', 'qux') + assert c.get('foo') == 'bar' + + def test_delete(self): + c = self.make_cache() + c.add('foo', 'bar') + assert c.get('foo') == 'bar' + c.delete('foo') + assert c.get('foo') is None + + def test_delete_many(self): + c = self.make_cache() + c.add('foo', 'bar') + c.add('spam', 'eggs') + c.delete_many('foo', 'spam') + assert c.get('foo') is None + assert c.get('spam') is None + + def test_inc_dec(self): + c = self.make_cache() + c.set('foo', 1) + assert c.inc('foo') == 2 + assert c.dec('foo') == 1 + c.delete('foo') + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SimpleCacheTestCase)) + suite.addTest(unittest.makeSuite(FileSystemCacheTestCase)) + if redis is not None: + suite.addTest(unittest.makeSuite(RedisCacheTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/fixers.py b/websdk/werkzeug/testsuite/contrib/fixers.py new file mode 100644 index 0000000..0422f8b --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/fixers.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.fixers + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + Server / Browser fixers. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.datastructures import ResponseCacheControl +from werkzeug.http import parse_cache_control_header + +from werkzeug.test import create_environ, Client +from werkzeug.wrappers import Request, Response +from werkzeug.contrib import fixers +from werkzeug.utils import redirect + + +@Request.application +def path_check_app(request): + return Response('PATH_INFO: %s\nSCRIPT_NAME: %s' % ( + request.environ.get('PATH_INFO', ''), + request.environ.get('SCRIPT_NAME', '') + )) + + +class ServerFixerTestCase(WerkzeugTestCase): + + def test_lighttpd_cgi_root_fix(self): + app = fixers.LighttpdCGIRootFix(path_check_app) + response = Response.from_app(app, dict(create_environ(), + SCRIPT_NAME='/foo', + PATH_INFO='/bar', + SERVER_SOFTWARE='lighttpd/1.4.27' + )) + assert response.data == 'PATH_INFO: /foo/bar\nSCRIPT_NAME: ' + + def test_path_info_from_request_uri_fix(self): + app = fixers.PathInfoFromRequestUriFix(path_check_app) + for key in 'REQUEST_URI', 'REQUEST_URL', 'UNENCODED_URL': + env = dict(create_environ(), SCRIPT_NAME='/test', PATH_INFO='/?????') + env[key] = '/test/foo%25bar?drop=this' + response = Response.from_app(app, env) + assert response.data == 'PATH_INFO: /foo%bar\nSCRIPT_NAME: /test' + + def test_proxy_fix(self): + """Test the ProxyFix fixer""" + @fixers.ProxyFix + @Request.application + def app(request): + return Response('%s|%s' % ( + request.remote_addr, + # do not use request.host as this fixes too :) + request.environ['HTTP_HOST'] + )) + environ = dict(create_environ(), + HTTP_X_FORWARDED_PROTO="https", + HTTP_X_FORWARDED_HOST='example.com', + HTTP_X_FORWARDED_FOR='1.2.3.4, 5.6.7.8', + REMOTE_ADDR='127.0.0.1', + HTTP_HOST='fake' + ) + + response = Response.from_app(app, environ) + + assert response.data == '1.2.3.4|example.com' + + # And we must check that if it is a redirection it is + # correctly done: + + redirect_app = redirect('/foo/bar.hml') + response = Response.from_app(redirect_app, environ) + + wsgi_headers = response.get_wsgi_headers(environ) + assert wsgi_headers['Location'] == 'https://example.com/foo/bar.hml' + + def test_proxy_fix_weird_enum(self): + @fixers.ProxyFix + @Request.application + def app(request): + return Response(request.remote_addr) + environ = dict(create_environ(), + HTTP_X_FORWARDED_FOR=',', + REMOTE_ADDR='127.0.0.1', + ) + + response = Response.from_app(app, environ) + self.assert_equal(response.data, '127.0.0.1') + + def test_header_rewriter_fix(self): + """Test the HeaderRewriterFix fixer""" + @Request.application + def application(request): + return Response("", headers=[ + ('X-Foo', 'bar') + ]) + application = fixers.HeaderRewriterFix(application, ('X-Foo',), (('X-Bar', '42'),)) + response = Response.from_app(application, create_environ()) + assert response.headers['Content-Type'] == 'text/plain; charset=utf-8' + assert 'X-Foo' not in response.headers + assert response.headers['X-Bar'] == '42' + + +class BrowserFixerTestCase(WerkzeugTestCase): + + def test_ie_fixes(self): + @fixers.InternetExplorerFix + @Request.application + def application(request): + response = Response('binary data here', mimetype='application/vnd.ms-excel') + response.headers['Vary'] = 'Cookie' + response.headers['Content-Disposition'] = 'attachment; filename=foo.xls' + return response + + c = Client(application, Response) + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + + # IE gets no vary + assert response.data == 'binary data here' + assert 'vary' not in response.headers + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + assert response.headers['content-type'] == 'application/vnd.ms-excel' + + # other browsers do + c = Client(application, Response) + response = c.get('/') + assert response.data == 'binary data here' + assert 'vary' in response.headers + + cc = ResponseCacheControl() + cc.no_cache = True + + @fixers.InternetExplorerFix + @Request.application + def application(request): + response = Response('binary data here', mimetype='application/vnd.ms-excel') + response.headers['Pragma'] = ', '.join(pragma) + response.headers['Cache-Control'] = cc.to_header() + response.headers['Content-Disposition'] = 'attachment; filename=foo.xls' + return response + + + # IE has no pragma or cache control + pragma = ('no-cache',) + c = Client(application, Response) + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + assert response.data == 'binary data here' + assert 'pragma' not in response.headers + assert 'cache-control' not in response.headers + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + # IE has simplified pragma + pragma = ('no-cache', 'x-foo') + cc.proxy_revalidate = True + response = c.get('/', headers=[ + ('User-Agent', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)') + ]) + assert response.data == 'binary data here' + assert response.headers['pragma'] == 'x-foo' + assert response.headers['cache-control'] == 'proxy-revalidate' + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + # regular browsers get everything + response = c.get('/') + assert response.data == 'binary data here' + assert response.headers['pragma'] == 'no-cache, x-foo' + cc = parse_cache_control_header(response.headers['cache-control'], + cls=ResponseCacheControl) + assert cc.no_cache + assert cc.proxy_revalidate + assert response.headers['content-disposition'] == 'attachment; filename=foo.xls' + + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ServerFixerTestCase)) + suite.addTest(unittest.makeSuite(BrowserFixerTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/iterio.py b/websdk/werkzeug/testsuite/contrib/iterio.py new file mode 100644 index 0000000..cddbd6e --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/iterio.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.iterio + ~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the iterio object. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase +from werkzeug.contrib.iterio import IterIO, greenlet + + +class IterOTestSuite(WerkzeugTestCase): + + def test_basic(self): + io = IterIO(["Hello", "World", "1", "2", "3"]) + assert io.tell() == 0 + assert io.read(2) == "He" + assert io.tell() == 2 + assert io.read(3) == "llo" + assert io.tell() == 5 + io.seek(0) + assert io.read(5) == "Hello" + assert io.tell() == 5 + assert io._buf == "Hello" + assert io.read() == "World123" + assert io.tell() == 13 + io.close() + assert io.closed + + io = IterIO(["Hello\n", "World!"]) + assert io.readline() == 'Hello\n' + assert io._buf == 'Hello\n' + assert io.read() == 'World!' + assert io._buf == 'Hello\nWorld!' + assert io.tell() == 12 + io.seek(0) + assert io.readlines() == ['Hello\n', 'World!'] + + io = IterIO(["foo\n", "bar"]) + io.seek(-4, 2) + assert io.read(4) == '\nbar' + + self.assert_raises(IOError, io.seek, 2, 100) + io.close() + self.assert_raises(ValueError, io.read) + + +class IterITestSuite(WerkzeugTestCase): + + def test_basic(self): + def producer(out): + out.write('1\n') + out.write('2\n') + out.flush() + out.write('3\n') + iterable = IterIO(producer) + self.assert_equal(iterable.next(), '1\n2\n') + self.assert_equal(iterable.next(), '3\n') + self.assert_raises(StopIteration, iterable.next) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(IterOTestSuite)) + if greenlet is not None: + suite.addTest(unittest.makeSuite(IterITestSuite)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/securecookie.py b/websdk/werkzeug/testsuite/contrib/securecookie.py new file mode 100644 index 0000000..fb10fa1 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/securecookie.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.securecookie + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Tests the secure cookie. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.utils import parse_cookie +from werkzeug.wrappers import Request, Response +from werkzeug.contrib.securecookie import SecureCookie + + +class SecureCookieTestCase(WerkzeugTestCase): + + def test_basic_support(self): + c = SecureCookie(secret_key='foo') + assert c.new + print c.modified, c.should_save + assert not c.modified + assert not c.should_save + c['x'] = 42 + assert c.modified + assert c.should_save + s = c.serialize() + + c2 = SecureCookie.unserialize(s, 'foo') + assert c is not c2 + assert not c2.new + assert not c2.modified + assert not c2.should_save + assert c2 == c + + c3 = SecureCookie.unserialize(s, 'wrong foo') + assert not c3.modified + assert not c3.new + assert c3 == {} + + def test_wrapper_support(self): + req = Request.from_values() + resp = Response() + c = SecureCookie.load_cookie(req, secret_key='foo') + assert c.new + c['foo'] = 42 + assert c.secret_key == 'foo' + c.save_cookie(resp) + + req = Request.from_values(headers={ + 'Cookie': 'session="%s"' % parse_cookie(resp.headers['set-cookie'])['session'] + }) + c2 = SecureCookie.load_cookie(req, secret_key='foo') + assert not c2.new + assert c2 == c + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SecureCookieTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/sessions.py b/websdk/werkzeug/testsuite/contrib/sessions.py new file mode 100644 index 0000000..18015a6 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/sessions.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.sessions + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Added tests for the sessions. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" +import unittest +import shutil + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.contrib.sessions import FilesystemSessionStore + +from tempfile import mkdtemp, gettempdir + + +class SessionTestCase(WerkzeugTestCase): + + def setup(self): + self.session_folder = mkdtemp() + + def teardown(self): + shutil.rmtree(self.session_folder) + + def test_default_tempdir(self): + store = FilesystemSessionStore() + assert store.path == gettempdir() + + def test_basic_fs_sessions(self): + store = FilesystemSessionStore(self.session_folder) + x = store.new() + assert x.new + assert not x.modified + x['foo'] = [1, 2, 3] + assert x.modified + store.save(x) + + x2 = store.get(x.sid) + assert not x2.new + assert not x2.modified + assert x2 is not x + assert x2 == x + x2['test'] = 3 + assert x2.modified + assert not x2.new + store.save(x2) + + x = store.get(x.sid) + store.delete(x) + x2 = store.get(x.sid) + # the session is not new when it was used previously. + assert not x2.new + + def test_renewing_fs_session(self): + store = FilesystemSessionStore(self.session_folder, renew_missing=True) + x = store.new() + store.save(x) + store.delete(x) + x2 = store.get(x.sid) + assert x2.new + + def test_fs_session_lising(self): + store = FilesystemSessionStore(self.session_folder, renew_missing=True) + sessions = set() + for x in xrange(10): + sess = store.new() + store.save(sess) + sessions.add(sess.sid) + + listed_sessions = set(store.list()) + assert sessions == listed_sessions + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(SessionTestCase)) + return suite diff --git a/websdk/werkzeug/testsuite/contrib/wrappers.py b/websdk/werkzeug/testsuite/contrib/wrappers.py new file mode 100644 index 0000000..e22d9a4 --- /dev/null +++ b/websdk/werkzeug/testsuite/contrib/wrappers.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.testsuite.contrib.wrappers + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Added tests for the sessions. + + :copyright: (c) 2011 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import with_statement + +import unittest + +from werkzeug.testsuite import WerkzeugTestCase + +from werkzeug.contrib import wrappers +from werkzeug import routing +from werkzeug.wrappers import Request, Response + + +class WrappersTestCase(WerkzeugTestCase): + + def test_reverse_slash_behavior(self): + class MyRequest(wrappers.ReverseSlashBehaviorRequestMixin, Request): + pass + req = MyRequest.from_values('/foo/bar', 'http://example.com/test') + assert req.url == 'http://example.com/test/foo/bar' + assert req.path == 'foo/bar' + assert req.script_root == '/test/' + + # make sure the routing system works with the slashes in + # reverse order as well. + map = routing.Map([routing.Rule('/foo/bar', endpoint='foo')]) + adapter = map.bind_to_environ(req.environ) + assert adapter.match() == ('foo', {}) + adapter = map.bind(req.host, req.script_root) + assert adapter.match(req.path) == ('foo', {}) + + def test_dynamic_charset_request_mixin(self): + class MyRequest(wrappers.DynamicCharsetRequestMixin, Request): + pass + env = {'CONTENT_TYPE': 'text/html'} + req = MyRequest(env) + assert req.charset == 'latin1' + + env = {'CONTENT_TYPE': 'text/html; charset=utf-8'} + req = MyRequest(env) + assert req.charset == 'utf-8' + + env = {'CONTENT_TYPE': 'application/octet-stream'} + req = MyRequest(env) + assert req.charset == 'latin1' + assert req.url_charset == 'latin1' + + MyRequest.url_charset = 'utf-8' + env = {'CONTENT_TYPE': 'application/octet-stream'} + req = MyRequest(env) + assert req.charset == 'latin1' + assert req.url_charset == 'utf-8' + + def return_ascii(x): + return "ascii" + env = {'CONTENT_TYPE': 'text/plain; charset=x-weird-charset'} + req = MyRequest(env) + req.unknown_charset = return_ascii + assert req.charset == 'ascii' + assert req.url_charset == 'utf-8' + + def test_dynamic_charset_response_mixin(self): + class MyResponse(wrappers.DynamicCharsetResponseMixin, Response): + default_charset = 'utf-7' + resp = MyResponse(mimetype='text/html') + assert resp.charset == 'utf-7' + resp.charset = 'utf-8' + assert resp.charset == 'utf-8' + assert resp.mimetype == 'text/html' + assert resp.mimetype_params == {'charset': 'utf-8'} + resp.mimetype_params['charset'] = 'iso-8859-15' + assert resp.charset == 'iso-8859-15' + resp.data = u'Hällo Wörld' + assert ''.join(resp.iter_encoded()) == \ + u'Hällo Wörld'.encode('iso-8859-15') + del resp.headers['content-type'] + try: + resp.charset = 'utf-8' + except TypeError, e: + pass + else: + assert False, 'expected type error on charset setting without ct' + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(WrappersTestCase)) + return suite |