Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
path: root/websdk/werkzeug/testsuite/wsgi.py
diff options
Diffstat (limited to 'websdk/werkzeug/testsuite/wsgi.py')
1 files changed, 217 insertions, 0 deletions
diff --git a/websdk/werkzeug/testsuite/wsgi.py b/websdk/werkzeug/testsuite/wsgi.py
new file mode 100644
index 0000000..6602fa2
--- /dev/null
+++ b/websdk/werkzeug/testsuite/wsgi.py
@@ -0,0 +1,217 @@
+# -*- coding: utf-8 -*-
+ werkzeug.testsuite.wsgi
+ ~~~~~~~~~~~~~~~~~~~~~~~
+ Tests the WSGI utilities.
+ :copyright: (c) 2011 by Armin Ronacher.
+ :license: BSD, see LICENSE for more details.
+from __future__ import with_statement
+import unittest
+from os import path
+from cStringIO import StringIO
+from werkzeug.testsuite import WerkzeugTestCase
+from werkzeug.wrappers import BaseResponse
+from werkzeug.exceptions import BadRequest, ClientDisconnected
+from werkzeug.test import Client, create_environ, run_wsgi_app
+from werkzeug import wsgi
+class WSGIUtilsTestCase(WerkzeugTestCase):
+ def test_shareddatamiddleware_get_file_loader(self):
+ app = wsgi.SharedDataMiddleware(None, {})
+ assert callable(app.get_file_loader('foo'))
+ def test_shared_data_middleware(self):
+ def null_application(environ, start_response):
+ start_response('404 NOT FOUND', [('Content-Type', 'text/plain')])
+ yield 'NOT FOUND'
+ app = wsgi.SharedDataMiddleware(null_application, {
+ '/': path.join(path.dirname(__file__), 'res'),
+ '/sources': path.join(path.dirname(__file__), 'res'),
+ '/pkg': ('werkzeug.debug', 'shared')
+ })
+ for p in '/test.txt', '/sources/test.txt':
+ app_iter, status, headers = run_wsgi_app(app, create_environ(p))
+ assert status == '200 OK'
+ assert ''.join(app_iter).strip() == 'FOUND'
+ app_iter, status, headers = run_wsgi_app(app, create_environ('/pkg/debugger.js'))
+ contents = ''.join(app_iter)
+ assert '$(function() {' in contents
+ app_iter, status, headers = run_wsgi_app(app, create_environ('/missing'))
+ assert status == '404 NOT FOUND'
+ assert ''.join(app_iter).strip() == 'NOT FOUND'
+ def test_get_host(self):
+ env = {'HTTP_X_FORWARDED_HOST': 'example.org',
+ 'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
+ assert wsgi.get_host(env) == 'example.org'
+ assert wsgi.get_host(create_environ('/', 'http://example.org')) \
+ == 'example.org'
+ def test_responder(self):
+ def foo(environ, start_response):
+ return BaseResponse('Test')
+ client = Client(wsgi.responder(foo), BaseResponse)
+ response = client.get('/')
+ assert response.status_code == 200
+ assert response.data == 'Test'
+ def test_pop_path_info(self):
+ original_env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b///c'}
+ # regular path info popping
+ def assert_tuple(script_name, path_info):
+ assert env.get('SCRIPT_NAME') == script_name
+ assert env.get('PATH_INFO') == path_info
+ env = original_env.copy()
+ pop = lambda: wsgi.pop_path_info(env)
+ assert_tuple('/foo', '/a/b///c')
+ assert pop() == 'a'
+ assert_tuple('/foo/a', '/b///c')
+ assert pop() == 'b'
+ assert_tuple('/foo/a/b', '///c')
+ assert pop() == 'c'
+ assert_tuple('/foo/a/b///c', '')
+ assert pop() is None
+ def test_peek_path_info(self):
+ env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/aaa/b///c'}
+ assert wsgi.peek_path_info(env) == 'aaa'
+ assert wsgi.peek_path_info(env) == 'aaa'
+ def test_limited_stream(self):
+ class RaisingLimitedStream(wsgi.LimitedStream):
+ def on_exhausted(self):
+ raise BadRequest('input stream exhausted')
+ io = StringIO('123456')
+ stream = RaisingLimitedStream(io, 3)
+ assert stream.read() == '123'
+ self.assert_raises(BadRequest, stream.read)
+ io = StringIO('123456')
+ stream = RaisingLimitedStream(io, 3)
+ assert stream.read(1) == '1'
+ assert stream.read(1) == '2'
+ assert stream.read(1) == '3'
+ self.assert_raises(BadRequest, stream.read)
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readline() == '123456\n'
+ assert stream.readline() == 'ab'
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines() == ['123456\n', 'ab']
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines(2) == ['12']
+ assert stream.readlines(2) == ['34']
+ assert stream.readlines() == ['56\n', 'ab']
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readline(100) == '123456\n'
+ io = StringIO('123456\nabcdefg')
+ stream = wsgi.LimitedStream(io, 9)
+ assert stream.readlines(100) == ['123456\n', 'ab']
+ io = StringIO('123456')
+ stream = wsgi.LimitedStream(io, 3)
+ assert stream.read(1) == '1'
+ assert stream.read(1) == '2'
+ assert stream.read() == '3'
+ assert stream.read() == ''
+ io = StringIO('123456')
+ stream = wsgi.LimitedStream(io, 3)
+ assert stream.read(-1) == '123'
+ def test_limited_stream_disconnection(self):
+ io = StringIO('A bit of content')
+ # disconnect detection on out of bytes
+ stream = wsgi.LimitedStream(io, 255)
+ with self.assert_raises(ClientDisconnected):
+ stream.read()
+ # disconnect detection because file close
+ io = StringIO('x' * 255)
+ io.close()
+ stream = wsgi.LimitedStream(io, 255)
+ with self.assert_raises(ClientDisconnected):
+ stream.read()
+ def test_path_info_extraction(self):
+ x = wsgi.extract_path_info('http://example.com/app', '/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app',
+ 'https://example.com/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app/hello')
+ assert x == u'/hello'
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app')
+ assert x == u'/'
+ x = wsgi.extract_path_info(u'http://☃.net/', u'/fööbär')
+ assert x == u'/fööbär'
+ x = wsgi.extract_path_info(u'http://☃.net/x', u'http://☃.net/x/fööbär')
+ assert x == u'/fööbär'
+ env = create_environ(u'/fööbär', u'http://☃.net/x/')
+ x = wsgi.extract_path_info(env, u'http://☃.net/x/fööbär')
+ assert x == u'/fööbär'
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/a/hello')
+ assert x is None
+ x = wsgi.extract_path_info('http://example.com/app/',
+ 'https://example.com/app/hello',
+ collapse_http_schemes=False)
+ assert x is None
+ def test_get_host_fallback(self):
+ assert wsgi.get_host({
+ 'SERVER_NAME': 'foobar.example.com',
+ 'wsgi.url_scheme': 'http',
+ 'SERVER_PORT': '80'
+ }) == 'foobar.example.com'
+ assert wsgi.get_host({
+ 'SERVER_NAME': 'foobar.example.com',
+ 'wsgi.url_scheme': 'http',
+ 'SERVER_PORT': '81'
+ }) == 'foobar.example.com:81'
+ def test_multi_part_line_breaks(self):
+ data = 'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
+ test_stream = StringIO(data)
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=16))
+ assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n', 'ABCDEFGHIJK']
+ data = 'abc\r\nThis line is broken by the buffer length.\r\nFoo bar baz'
+ test_stream = StringIO(data)
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=24))
+ assert lines == ['abc\r\n', 'This line is broken by the buffer length.\r\n', 'Foo bar baz']
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(WSGIUtilsTestCase))
+ return suite