Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
path: root/creactiweb/_templates/lib/werkzeug/test.py
diff options
Diffstat (limited to 'creactiweb/_templates/lib/werkzeug/test.py')
1 files changed, 808 insertions, 0 deletions
diff --git a/creactiweb/_templates/lib/werkzeug/test.py b/creactiweb/_templates/lib/werkzeug/test.py
new file mode 100644
index 0000000..16a3927
--- /dev/null
+++ b/creactiweb/_templates/lib/werkzeug/test.py
@@ -0,0 +1,808 @@
+# -*- coding: utf-8 -*-
+ werkzeug.test
+ ~~~~~~~~~~~~~
+ This module implements a client to WSGI applications for testing.
+ :copyright: (c) 2010 by the Werkzeug Team, see AUTHORS for more details.
+ :license: BSD, see LICENSE for more details.
+import sys
+import urllib
+import urlparse
+import mimetypes
+from time import time
+from random import random
+from itertools import chain
+from tempfile import TemporaryFile
+from cStringIO import StringIO
+from cookielib import CookieJar
+from urllib2 import Request as U2Request
+from werkzeug._internal import _empty_stream, _get_environ
+from werkzeug.wrappers import BaseRequest
+from werkzeug.urls import url_encode, url_fix, iri_to_uri
+from werkzeug.wsgi import get_host, get_current_url
+from werkzeug.datastructures import FileMultiDict, MultiDict, \
+ CombinedMultiDict, Headers, FileStorage
+def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
+ boundary=None, charset='utf-8'):
+ """Encode a dict of values (either strings or file descriptors or
+ :class:`FileStorage` objects.) into a multipart encoded string stored
+ in a file descriptor.
+ """
+ if boundary is None:
+ boundary = '---------------WerkzeugFormPart_%s%s' % (time(), random())
+ _closure = [StringIO(), 0, False]
+ if use_tempfile:
+ def write(string):
+ stream, total_length, on_disk = _closure
+ if on_disk:
+ stream.write(string)
+ else:
+ length = len(string)
+ if length + _closure[1] <= threshold:
+ stream.write(string)
+ else:
+ new_stream = TemporaryFile('wb+')
+ new_stream.write(stream.getvalue())
+ new_stream.write(string)
+ _closure[0] = new_stream
+ _closure[2] = True
+ _closure[1] = total_length + length
+ else:
+ write = _closure[0].write
+ if not isinstance(values, MultiDict):
+ values = MultiDict(values)
+ for key, values in values.iterlists():
+ for value in values:
+ write('--%s\r\nContent-Disposition: form-data; name="%s"' %
+ (boundary, key))
+ reader = getattr(value, 'read', None)
+ if reader is not None:
+ filename = getattr(value, 'filename',
+ getattr(value, 'name', None))
+ content_type = getattr(value, 'content_type', None)
+ if content_type is None:
+ content_type = filename and \
+ mimetypes.guess_type(filename)[0] or \
+ 'application/octet-stream'
+ if filename is not None:
+ write('; filename="%s"\r\n' % filename)
+ else:
+ write('\r\n')
+ write('Content-Type: %s\r\n\r\n' % content_type)
+ while 1:
+ chunk = reader(16384)
+ if not chunk:
+ break
+ write(chunk)
+ else:
+ if isinstance(value, unicode):
+ value = value.encode(charset)
+ write('\r\n\r\n' + value)
+ write('\r\n')
+ write('--%s--\r\n' % boundary)
+ length = int(_closure[0].tell())
+ _closure[0].seek(0)
+ return _closure[0], length, boundary
+def encode_multipart(values, boundary=None, charset='utf-8'):
+ """Like `stream_encode_multipart` but returns a tuple in the form
+ (``boundary``, ``data``) where data is a bytestring.
+ """
+ stream, length, boundary = stream_encode_multipart(
+ values, use_tempfile=False, boundary=boundary, charset=charset)
+ return boundary, stream.read()
+def File(fd, filename=None, mimetype=None):
+ """Backwards compat."""
+ from warnings import warn
+ warn(DeprecationWarning('werkzeug.test.File is deprecated, use the '
+ 'EnvironBuilder or FileStorage instead'))
+ return FileStorage(fd, filename=filename, content_type=mimetype)
+class _TestCookieHeaders(object):
+ """A headers adapter for cookielib
+ """
+ def __init__(self, headers):
+ self.headers = headers
+ def getheaders(self, name):
+ headers = []
+ name = name.lower()
+ for k, v in self.headers:
+ if k.lower() == name:
+ headers.append(v)
+ return headers
+class _TestCookieResponse(object):
+ """Something that looks like a httplib.HTTPResponse, but is actually just an
+ adapter for our test responses to make them available for cookielib.
+ """
+ def __init__(self, headers):
+ self.headers = _TestCookieHeaders(headers)
+ def info(self):
+ return self.headers
+class _TestCookieJar(CookieJar):
+ """A cookielib.CookieJar modified to inject and read cookie headers from
+ and to wsgi environments, and wsgi application responses.
+ """
+ def inject_wsgi(self, environ):
+ """Inject the cookies as client headers into the server's wsgi
+ environment.
+ """
+ cvals = []
+ for cookie in self:
+ cvals.append('%s=%s' % (cookie.name, cookie.value))
+ if cvals:
+ environ['HTTP_COOKIE'] = ', '.join(cvals)
+ def extract_wsgi(self, environ, headers):
+ """Extract the server's set-cookie headers as cookies into the
+ cookie jar.
+ """
+ self.extract_cookies(
+ _TestCookieResponse(headers),
+ U2Request(get_current_url(environ)),
+ )
+def _iter_data(data):
+ """Iterates over a dict or multidict yielding all keys and values.
+ This is used to iterate over the data passed to the
+ :class:`EnvironBuilder`.
+ """
+ if isinstance(data, MultiDict):
+ for key, values in data.iterlists():
+ for value in values:
+ yield key, value
+ else:
+ for key, values in data.iteritems():
+ if isinstance(values, list):
+ for value in values:
+ yield key, value
+ else:
+ yield key, values
+class EnvironBuilder(object):
+ """This class can be used to conveniently create a WSGI environment
+ for testing purposes. It can be used to quickly create WSGI environments
+ or request objects from arbitrary data.
+ The signature of this class is also used in some other places as of
+ Werkzeug 0.5 (:func:`create_environ`, :meth:`BaseResponse.from_values`,
+ :meth:`Client.open`). Because of this most of the functionality is
+ available through the constructor alone.
+ Files and regular form data can be manipulated independently of each
+ other with the :attr:`form` and :attr:`files` attributes, but are
+ passed with the same argument to the constructor: `data`.
+ `data` can be any of these values:
+ - a `str`: If it's a string it is converted into a :attr:`input_stream`,
+ the :attr:`content_length` is set and you have to provide a
+ :attr:`content_type`.
+ - a `dict`: If it's a dict the keys have to be strings and the values
+ any of the following objects:
+ - a :class:`file`-like object. These are converted into
+ :class:`FileStorage` objects automatically.
+ - a tuple. The :meth:`~FileMultiDict.add_file` method is called
+ with the tuple items as positional arguments.
+ .. versionadded:: 0.6
+ `path` and `base_url` can now be unicode strings that are encoded using
+ the :func:`iri_to_uri` function.
+ :param path: the path of the request. In the WSGI environment this will
+ end up as `PATH_INFO`. If the `query_string` is not defined
+ and there is a question mark in the `path` everything after
+ it is used as query string.
+ :param base_url: the base URL is a URL that is used to extract the WSGI
+ URL scheme, host (server name + server port) and the
+ script root (`SCRIPT_NAME`).
+ :param query_string: an optional string or dict with URL parameters.
+ :param method: the HTTP method to use, defaults to `GET`.
+ :param input_stream: an optional input stream. Do not specify this and
+ `data`. As soon as an input stream is set you can't
+ modify :attr:`args` and :attr:`files` unless you
+ set the :attr:`input_stream` to `None` again.
+ :param content_type: The content type for the request. As of 0.5 you
+ don't have to provide this when specifying files
+ and form data via `data`.
+ :param content_length: The content length for the request. You don't
+ have to specify this when providing data via
+ `data`.
+ :param errors_stream: an optional error stream that is used for
+ `wsgi.errors`. Defaults to :data:`stderr`.
+ :param multithread: controls `wsgi.multithread`. Defaults to `False`.
+ :param multiprocess: controls `wsgi.multiprocess`. Defaults to `False`.
+ :param run_once: controls `wsgi.run_once`. Defaults to `False`.
+ :param headers: an optional list or :class:`Headers` object of headers.
+ :param data: a string or dict of form data. See explanation above.
+ :param environ_base: an optional dict of environment defaults.
+ :param environ_overrides: an optional dict of environment overrides.
+ :param charset: the charset used to encode unicode data.
+ """
+ #: the server protocol to use. defaults to HTTP/1.1
+ server_protocol = 'HTTP/1.1'
+ #: the wsgi version to use. defaults to (1, 0)
+ wsgi_version = (1, 0)
+ #: the default request class for :meth:`get_request`
+ request_class = BaseRequest
+ def __init__(self, path='/', base_url=None, query_string=None,
+ method='GET', input_stream=None, content_type=None,
+ content_length=None, errors_stream=None, multithread=False,
+ multiprocess=False, run_once=False, headers=None, data=None,
+ environ_base=None, environ_overrides=None, charset='utf-8'):
+ if query_string is None and '?' in path:
+ path, query_string = path.split('?', 1)
+ self.charset = charset
+ if isinstance(path, unicode):
+ path = iri_to_uri(path, charset)
+ self.path = path
+ if base_url is not None:
+ if isinstance(base_url, unicode):
+ base_url = iri_to_uri(base_url, charset)
+ else:
+ base_url = url_fix(base_url, charset)
+ self.base_url = base_url
+ if isinstance(query_string, basestring):
+ self.query_string = query_string
+ else:
+ if query_string is None:
+ query_string = MultiDict()
+ elif not isinstance(query_string, MultiDict):
+ query_string = MultiDict(query_string)
+ self.args = query_string
+ self.method = method
+ if headers is None:
+ headers = Headers()
+ elif not isinstance(headers, Headers):
+ headers = Headers(headers)
+ self.headers = headers
+ self.content_type = content_type
+ if errors_stream is None:
+ errors_stream = sys.stderr
+ self.errors_stream = errors_stream
+ self.multithread = multithread
+ self.multiprocess = multiprocess
+ self.run_once = run_once
+ self.environ_base = environ_base
+ self.environ_overrides = environ_overrides
+ self.input_stream = input_stream
+ self.content_length = content_length
+ self.closed = False
+ if data:
+ if input_stream is not None:
+ raise TypeError('can\'t provide input stream and data')
+ if isinstance(data, basestring):
+ self.input_stream = StringIO(data)
+ if self.content_length is None:
+ self.content_length = len(data)
+ else:
+ for key, value in _iter_data(data):
+ if isinstance(value, (tuple, dict)) or \
+ hasattr(value, 'read'):
+ self._add_file_from_data(key, value)
+ else:
+ self.form.setlistdefault(key).append(value)
+ def _add_file_from_data(self, key, value):
+ """Called in the EnvironBuilder to add files from the data dict."""
+ if isinstance(value, tuple):
+ self.files.add_file(key, *value)
+ elif isinstance(value, dict):
+ from warnings import warn
+ warn(DeprecationWarning('it\'s no longer possible to pass dicts '
+ 'as `data`. Use tuples or FileStorage '
+ 'objects instead'), stacklevel=2)
+ args = v
+ value = dict(value)
+ mimetype = value.pop('mimetype', None)
+ if mimetype is not None:
+ value['content_type'] = mimetype
+ self.files.add_file(key, **value)
+ else:
+ self.files.add_file(key, value)
+ def _get_base_url(self):
+ return urlparse.urlunsplit((self.url_scheme, self.host,
+ self.script_root, '', '')).rstrip('/') + '/'
+ def _set_base_url(self, value):
+ if value is None:
+ scheme = 'http'
+ netloc = 'localhost'
+ scheme = 'http'
+ script_root = ''
+ else:
+ scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(value)
+ if qs or anchor:
+ raise ValueError('base url must not contain a query string '
+ 'or fragment')
+ self.script_root = script_root.rstrip('/')
+ self.host = netloc
+ self.url_scheme = scheme
+ base_url = property(_get_base_url, _set_base_url, doc='''
+ The base URL is a URL that is used to extract the WSGI
+ URL scheme, host (server name + server port) and the
+ script root (`SCRIPT_NAME`).''')
+ del _get_base_url, _set_base_url
+ def _get_content_type(self):
+ ct = self.headers.get('Content-Type')
+ if ct is None and not self._input_stream:
+ if self.method in ('POST', 'PUT'):
+ if self._files:
+ return 'multipart/form-data'
+ return 'application/x-www-form-urlencoded'
+ return None
+ return ct
+ def _set_content_type(self, value):
+ if value is None:
+ self.headers.pop('Content-Type', None)
+ else:
+ self.headers['Content-Type'] = value
+ content_type = property(_get_content_type, _set_content_type, doc='''
+ The content type for the request. Reflected from and to the
+ :attr:`headers`. Do not set if you set :attr:`files` or
+ :attr:`form` for auto detection.''')
+ del _get_content_type, _set_content_type
+ def _get_content_length(self):
+ return self.headers.get('Content-Length', type=int)
+ def _set_content_length(self, value):
+ if value is None:
+ self.headers.pop('Content-Length', None)
+ else:
+ self.headers['Content-Length'] = str(value)
+ content_length = property(_get_content_length, _set_content_length, doc='''
+ The content length as integer. Reflected from and to the
+ :attr:`headers`. Do not set if you set :attr:`files` or
+ :attr:`form` for auto detection.''')
+ del _get_content_length, _set_content_length
+ def form_property(name, storage, doc):
+ key = '_' + name
+ def getter(self):
+ if self._input_stream is not None:
+ raise AttributeError('an input stream is defined')
+ rv = getattr(self, key)
+ if rv is None:
+ rv = storage()
+ setattr(self, key, rv)
+ return rv
+ def setter(self, value):
+ self._input_stream = None
+ setattr(self, key, value)
+ return property(getter, setter, doc)
+ form = form_property('form', MultiDict, doc='''
+ A :class:`MultiDict` of form values.''')
+ files = form_property('files', FileMultiDict, doc='''
+ A :class:`FileMultiDict` of uploaded files. You can use the
+ :meth:`~FileMultiDict.add_file` method to add new files to the
+ dict.''')
+ del form_property
+ def _get_input_stream(self):
+ return self._input_stream
+ def _set_input_stream(self, value):
+ self._input_stream = value
+ self._form = self._files = None
+ input_stream = property(_get_input_stream, _set_input_stream, doc='''
+ An optional input stream. If you set this it will clear
+ :attr:`form` and :attr:`files`.''')
+ del _get_input_stream, _set_input_stream
+ def _get_query_string(self):
+ if self._query_string is None:
+ if self._args is not None:
+ return url_encode(self._args, charset=self.charset)
+ return ''
+ return self._query_string
+ def _set_query_string(self, value):
+ self._query_string = value
+ self._args = None
+ query_string = property(_get_query_string, _set_query_string, doc='''
+ The query string. If you set this to a string :attr:`args` will
+ no longer be available.''')
+ del _get_query_string, _set_query_string
+ def _get_args(self):
+ if self._query_string is not None:
+ raise AttributeError('a query string is defined')
+ if self._args is None:
+ self._args = MultiDict()
+ return self._args
+ def _set_args(self, value):
+ self._query_string = None
+ self._args = value
+ args = property(_get_args, _set_args, doc='''
+ The URL arguments as :class:`MultiDict`.''')
+ del _get_args, _set_args
+ @property
+ def server_name(self):
+ """The server name (read-only, use :attr:`host` to set)"""
+ return self.host.split(':', 1)[0]
+ @property
+ def server_port(self):
+ """The server port as integer (read-only, use :attr:`host` to set)"""
+ pieces = self.host.split(':', 1)
+ if len(pieces) == 2 and pieces[1].isdigit():
+ return int(pieces[1])
+ elif self.url_scheme == 'https':
+ return 443
+ return 80
+ def __del__(self):
+ self.close()
+ def close(self):
+ """Closes all files. If you put real :class:`file` objects into the
+ :attr:`files` dict you can call this method to automatically close
+ them all in one go.
+ """
+ if self.closed:
+ return
+ try:
+ files = self.files.itervalues()
+ except AttributeError:
+ files = ()
+ for f in files:
+ try:
+ f.close()
+ except Exception, e:
+ pass
+ self.closed = True
+ def get_environ(self):
+ """Return the built environ."""
+ input_stream = self.input_stream
+ content_length = self.content_length
+ content_type = self.content_type
+ if input_stream is not None:
+ start_pos = input_stream.tell()
+ input_stream.seek(0, 2)
+ end_pos = input_stream.tell()
+ input_stream.seek(start_pos)
+ content_length = end_pos - start_pos
+ elif content_type == 'multipart/form-data':
+ values = CombinedMultiDict([self.form, self.files])
+ input_stream, content_length, boundary = \
+ stream_encode_multipart(values, charset=self.charset)
+ content_type += '; boundary="%s"' % boundary
+ elif content_type == 'application/x-www-form-urlencoded':
+ values = url_encode(self.form, charset=self.charset)
+ content_length = len(values)
+ input_stream = StringIO(values)
+ else:
+ input_stream = _empty_stream
+ result = {}
+ if self.environ_base:
+ result.update(self.environ_base)
+ def _path_encode(x):
+ if isinstance(x, unicode):
+ x = x.encode(self.charset)
+ return urllib.unquote(x)
+ result.update({
+ 'REQUEST_METHOD': self.method,
+ 'SCRIPT_NAME': _path_encode(self.script_root),
+ 'PATH_INFO': _path_encode(self.path),
+ 'QUERY_STRING': self.query_string,
+ 'SERVER_NAME': self.server_name,
+ 'SERVER_PORT': str(self.server_port),
+ 'HTTP_HOST': self.host,
+ 'SERVER_PROTOCOL': self.server_protocol,
+ 'CONTENT_TYPE': content_type or '',
+ 'CONTENT_LENGTH': str(content_length or '0'),
+ 'wsgi.version': self.wsgi_version,
+ 'wsgi.url_scheme': self.url_scheme,
+ 'wsgi.input': input_stream,
+ 'wsgi.errors': self.errors_stream,
+ 'wsgi.multithread': self.multithread,
+ 'wsgi.multiprocess': self.multiprocess,
+ 'wsgi.run_once': self.run_once
+ })
+ for key, value in self.headers.to_list(self.charset):
+ result['HTTP_%s' % key.upper().replace('-', '_')] = value
+ if self.environ_overrides:
+ result.update(self.environ_overrides)
+ return result
+ def get_request(self, cls=None):
+ """Returns a request with the data. If the request class is not
+ specified :attr:`request_class` is used.
+ :param cls: The request wrapper to use.
+ """
+ if cls is None:
+ cls = self.request_class
+ return cls(self.get_environ())
+class ClientRedirectError(Exception):
+ """
+ If a redirect loop is detected when using follow_redirects=True with
+ the :cls:`Client`, then this exception is raised.
+ """
+class Client(object):
+ """This class allows to send requests to a wrapped application.
+ The response wrapper can be a class or factory function that takes
+ three arguments: app_iter, status and headers. The default response
+ wrapper just returns a tuple.
+ Example::
+ class ClientResponse(BaseResponse):
+ ...
+ client = Client(MyApplication(), response_wrapper=ClientResponse)
+ The use_cookies parameter indicates whether cookies should be stored and
+ sent for subsequent requests. This is True by default, but passing False
+ will disable this behaviour.
+ .. versionadded:: 0.5
+ `use_cookies` is new in this version. Older versions did not provide
+ builtin cookie support.
+ """
+ def __init__(self, application, response_wrapper=None, use_cookies=True):
+ self.application = application
+ if response_wrapper is None:
+ response_wrapper = lambda a, s, h: (a, s, h)
+ self.response_wrapper = response_wrapper
+ if use_cookies:
+ self.cookie_jar = _TestCookieJar()
+ else:
+ self.cookie_jar = None
+ self.redirect_client = None
+ def open(self, *args, **kwargs):
+ """Takes the same arguments as the :class:`EnvironBuilder` class with
+ some additions: You can provide a :class:`EnvironBuilder` or a WSGI
+ environment as only argument instead of the :class:`EnvironBuilder`
+ arguments and two optional keyword arguments (`as_tuple`, `buffered`)
+ that change the type of the return value or the way the application is
+ executed.
+ .. versionchanged:: 0.5
+ If a dict is provided as file in the dict for the `data` parameter
+ the content type has to be called `content_type` now instead of
+ `mimetype`. This change was made for consistency with
+ :class:`werkzeug.FileWrapper`.
+ The `follow_redirects` parameter was added to :func:`open`.
+ Additional parameters:
+ :param as_tuple: Returns a tuple in the form ``(environ, result)``
+ :param buffered: Set this to True to buffer the application run.
+ This will automatically close the application for
+ you as well.
+ :param follow_redirects: Set this to True if the `Client` should
+ follow HTTP redirects.
+ """
+ as_tuple = kwargs.pop('as_tuple', False)
+ buffered = kwargs.pop('buffered', False)
+ follow_redirects = kwargs.pop('follow_redirects', False)
+ environ = None
+ if not kwargs and len(args) == 1:
+ if isinstance(args[0], EnvironBuilder):
+ environ = args[0].get_environ()
+ elif isinstance(args[0], dict):
+ environ = args[0]
+ if environ is None:
+ builder = EnvironBuilder(*args, **kwargs)
+ try:
+ environ = builder.get_environ()
+ finally:
+ builder.close()
+ if self.cookie_jar is not None:
+ self.cookie_jar.inject_wsgi(environ)
+ rv = run_wsgi_app(self.application, environ, buffered=buffered)
+ if self.cookie_jar is not None:
+ self.cookie_jar.extract_wsgi(environ, rv[2])
+ # handle redirects
+ redirect_chain = []
+ status_code = int(rv[1].split(None, 1)[0])
+ while status_code in (301, 302, 303, 305, 307) and follow_redirects:
+ if not self.redirect_client:
+ # assume that we're not using the user defined response wrapper
+ # so that we don't need any ugly hacks to get the status
+ # code from the response.
+ self.redirect_client = Client(self.application)
+ self.redirect_client.cookie_jar = self.cookie_jar
+ redirect = dict(rv[2])['Location']
+ scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(redirect)
+ base_url = urlparse.urlunsplit((scheme, netloc, '', '', '')).rstrip('/') + '/'
+ host = get_host(create_environ('/', base_url, query_string=qs)).split(':', 1)[0]
+ if get_host(environ).split(':', 1)[0] != host:
+ raise RuntimeError('%r does not support redirect to '
+ 'external targets' % self.__class__)
+ redirect_chain.append((redirect, status_code))
+ # the redirect request should be a new request, and not be based on
+ # the old request
+ redirect_kwargs = {}
+ redirect_kwargs.update({
+ 'path': script_root,
+ 'base_url': base_url,
+ 'query_string': qs,
+ 'as_tuple': True,
+ 'buffered': buffered,
+ 'follow_redirects': False,
+ })
+ environ, rv = self.redirect_client.open(**redirect_kwargs)
+ status_code = int(rv[1].split(None, 1)[0])
+ # Prevent loops
+ if redirect_chain[-1] in redirect_chain[0:-1]:
+ raise ClientRedirectError("loop detected")
+ response = self.response_wrapper(*rv)
+ if as_tuple:
+ return environ, response
+ return response
+ def get(self, *args, **kw):
+ """Like open but method is enforced to GET."""
+ kw['method'] = 'GET'
+ return self.open(*args, **kw)
+ def post(self, *args, **kw):
+ """Like open but method is enforced to POST."""
+ kw['method'] = 'POST'
+ return self.open(*args, **kw)
+ def head(self, *args, **kw):
+ """Like open but method is enforced to HEAD."""
+ kw['method'] = 'HEAD'
+ return self.open(*args, **kw)
+ def put(self, *args, **kw):
+ """Like open but method is enforced to PUT."""
+ kw['method'] = 'PUT'
+ return self.open(*args, **kw)
+ def delete(self, *args, **kw):
+ """Like open but method is enforced to DELETE."""
+ kw['method'] = 'DELETE'
+ return self.open(*args, **kw)
+ def __repr__(self):
+ return '<%s %r>' % (
+ self.__class__.__name__,
+ self.application
+ )
+def create_environ(*args, **kwargs):
+ """Create a new WSGI environ dict based on the values passed. The first
+ parameter should be the path of the request which defaults to '/'. The
+ second one can either be an absolute path (in that case the host is
+ localhost:80) or a full path to the request with scheme, netloc port and
+ the path to the script.
+ This accepts the same arguments as the :class:`EnvironBuilder`
+ constructor.
+ .. versionchanged:: 0.5
+ This function is now a thin wrapper over :class:`EnvironBuilder` which
+ was added in 0.5. The `headers`, `environ_base`, `environ_overrides`
+ and `charset` parameters were added.
+ """
+ builder = EnvironBuilder(*args, **kwargs)
+ try:
+ return builder.get_environ()
+ finally:
+ builder.close()
+def run_wsgi_app(app, environ, buffered=False):
+ """Return a tuple in the form (app_iter, status, headers) of the
+ application output. This works best if you pass it an application that
+ returns an iterator all the time.
+ Sometimes applications may use the `write()` callable returned
+ by the `start_response` function. This tries to resolve such edge
+ cases automatically. But if you don't get the expected output you
+ should set `buffered` to `True` which enforces buffering.
+ If passed an invalid WSGI application the behavior of this function is
+ undefined. Never pass non-conforming WSGI applications to this function.
+ :param app: the application to execute.
+ :param buffered: set to `True` to enforce buffering.
+ :return: tuple in the form ``(app_iter, status, headers)``
+ """
+ environ = _get_environ(environ)
+ response = []
+ buffer = []
+ def start_response(status, headers, exc_info=None):
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ response[:] = [status, headers]
+ return buffer.append
+ app_iter = app(environ, start_response)
+ # when buffering we emit the close call early and convert the
+ # application iterator into a regular list
+ if buffered:
+ close_func = getattr(app_iter, 'close', None)
+ try:
+ app_iter = list(app_iter)
+ finally:
+ if close_func is not None:
+ close_func()
+ # otherwise we iterate the application iter until we have
+ # a response, chain the already received data with the already
+ # collected data and wrap it in a new `ClosingIterator` if
+ # we have a close callable.
+ else:
+ while not response:
+ buffer.append(app_iter.next())
+ if buffer:
+ close_func = getattr(app_iter, 'close', None)
+ app_iter = chain(buffer, app_iter)
+ if close_func is not None:
+ app_iter = ClosingIterator(app_iter, close_func)
+ return app_iter, response[0], response[1]
+from werkzeug.wsgi import ClosingIterator