Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/cherrypy/lib/cptools.py
diff options
context:
space:
mode:
Diffstat (limited to 'cherrypy/lib/cptools.py')
-rwxr-xr-xcherrypy/lib/cptools.py611
1 files changed, 611 insertions, 0 deletions
diff --git a/cherrypy/lib/cptools.py b/cherrypy/lib/cptools.py
new file mode 100755
index 0000000..3eedf97
--- /dev/null
+++ b/cherrypy/lib/cptools.py
@@ -0,0 +1,611 @@
+"""Functions for builtin CherryPy tools."""
+
+import logging
+import re
+
+import cherrypy
+from cherrypy._cpcompat import basestring, ntob, md5, set
+from cherrypy.lib import httputil as _httputil
+
+
+# Conditional HTTP request support #
+
+def validate_etags(autotags=False, debug=False):
+ """Validate the current ETag against If-Match, If-None-Match headers.
+
+ If autotags is True, an ETag response-header value will be provided
+ from an MD5 hash of the response body (unless some other code has
+ already provided an ETag header). If False (the default), the ETag
+ will not be automatic.
+
+ WARNING: the autotags feature is not designed for URL's which allow
+ methods other than GET. For example, if a POST to the same URL returns
+ no content, the automatic ETag will be incorrect, breaking a fundamental
+ use for entity tags in a possibly destructive fashion. Likewise, if you
+ raise 304 Not Modified, the response body will be empty, the ETag hash
+ will be incorrect, and your application will break.
+ See :rfc:`2616` Section 14.24.
+ """
+ response = cherrypy.serving.response
+
+ # Guard against being run twice.
+ if hasattr(response, "ETag"):
+ return
+
+ status, reason, msg = _httputil.valid_status(response.status)
+
+ etag = response.headers.get('ETag')
+
+ # Automatic ETag generation. See warning in docstring.
+ if etag:
+ if debug:
+ cherrypy.log('ETag already set: %s' % etag, 'TOOLS.ETAGS')
+ elif not autotags:
+ if debug:
+ cherrypy.log('Autotags off', 'TOOLS.ETAGS')
+ elif status != 200:
+ if debug:
+ cherrypy.log('Status not 200', 'TOOLS.ETAGS')
+ else:
+ etag = response.collapse_body()
+ etag = '"%s"' % md5(etag).hexdigest()
+ if debug:
+ cherrypy.log('Setting ETag: %s' % etag, 'TOOLS.ETAGS')
+ response.headers['ETag'] = etag
+
+ response.ETag = etag
+
+ # "If the request would, without the If-Match header field, result in
+ # anything other than a 2xx or 412 status, then the If-Match header
+ # MUST be ignored."
+ if debug:
+ cherrypy.log('Status: %s' % status, 'TOOLS.ETAGS')
+ if status >= 200 and status <= 299:
+ request = cherrypy.serving.request
+
+ conditions = request.headers.elements('If-Match') or []
+ conditions = [str(x) for x in conditions]
+ if debug:
+ cherrypy.log('If-Match conditions: %s' % repr(conditions),
+ 'TOOLS.ETAGS')
+ if conditions and not (conditions == ["*"] or etag in conditions):
+ raise cherrypy.HTTPError(412, "If-Match failed: ETag %r did "
+ "not match %r" % (etag, conditions))
+
+ conditions = request.headers.elements('If-None-Match') or []
+ conditions = [str(x) for x in conditions]
+ if debug:
+ cherrypy.log('If-None-Match conditions: %s' % repr(conditions),
+ 'TOOLS.ETAGS')
+ if conditions == ["*"] or etag in conditions:
+ if debug:
+ cherrypy.log('request.method: %s' % request.method, 'TOOLS.ETAGS')
+ if request.method in ("GET", "HEAD"):
+ raise cherrypy.HTTPRedirect([], 304)
+ else:
+ raise cherrypy.HTTPError(412, "If-None-Match failed: ETag %r "
+ "matched %r" % (etag, conditions))
+
+def validate_since():
+ """Validate the current Last-Modified against If-Modified-Since headers.
+
+ If no code has set the Last-Modified response header, then no validation
+ will be performed.
+ """
+ response = cherrypy.serving.response
+ lastmod = response.headers.get('Last-Modified')
+ if lastmod:
+ status, reason, msg = _httputil.valid_status(response.status)
+
+ request = cherrypy.serving.request
+
+ since = request.headers.get('If-Unmodified-Since')
+ if since and since != lastmod:
+ if (status >= 200 and status <= 299) or status == 412:
+ raise cherrypy.HTTPError(412)
+
+ since = request.headers.get('If-Modified-Since')
+ if since and since == lastmod:
+ if (status >= 200 and status <= 299) or status == 304:
+ if request.method in ("GET", "HEAD"):
+ raise cherrypy.HTTPRedirect([], 304)
+ else:
+ raise cherrypy.HTTPError(412)
+
+
+# Tool code #
+
+def allow(methods=None, debug=False):
+ """Raise 405 if request.method not in methods (default GET/HEAD).
+
+ The given methods are case-insensitive, and may be in any order.
+ If only one method is allowed, you may supply a single string;
+ if more than one, supply a list of strings.
+
+ Regardless of whether the current method is allowed or not, this
+ also emits an 'Allow' response header, containing the given methods.
+ """
+ if not isinstance(methods, (tuple, list)):
+ methods = [methods]
+ methods = [m.upper() for m in methods if m]
+ if not methods:
+ methods = ['GET', 'HEAD']
+ elif 'GET' in methods and 'HEAD' not in methods:
+ methods.append('HEAD')
+
+ cherrypy.response.headers['Allow'] = ', '.join(methods)
+ if cherrypy.request.method not in methods:
+ if debug:
+ cherrypy.log('request.method %r not in methods %r' %
+ (cherrypy.request.method, methods), 'TOOLS.ALLOW')
+ raise cherrypy.HTTPError(405)
+ else:
+ if debug:
+ cherrypy.log('request.method %r in methods %r' %
+ (cherrypy.request.method, methods), 'TOOLS.ALLOW')
+
+
+def proxy(base=None, local='X-Forwarded-Host', remote='X-Forwarded-For',
+ scheme='X-Forwarded-Proto', debug=False):
+ """Change the base URL (scheme://host[:port][/path]).
+
+ For running a CP server behind Apache, lighttpd, or other HTTP server.
+
+ If you want the new request.base to include path info (not just the host),
+ you must explicitly set base to the full base path, and ALSO set 'local'
+ to '', so that the X-Forwarded-Host request header (which never includes
+ path info) does not override it. Regardless, the value for 'base' MUST
+ NOT end in a slash.
+
+ cherrypy.request.remote.ip (the IP address of the client) will be
+ rewritten if the header specified by the 'remote' arg is valid.
+ By default, 'remote' is set to 'X-Forwarded-For'. If you do not
+ want to rewrite remote.ip, set the 'remote' arg to an empty string.
+ """
+
+ request = cherrypy.serving.request
+
+ if scheme:
+ s = request.headers.get(scheme, None)
+ if debug:
+ cherrypy.log('Testing scheme %r:%r' % (scheme, s), 'TOOLS.PROXY')
+ if s == 'on' and 'ssl' in scheme.lower():
+ # This handles e.g. webfaction's 'X-Forwarded-Ssl: on' header
+ scheme = 'https'
+ else:
+ # This is for lighttpd/pound/Mongrel's 'X-Forwarded-Proto: https'
+ scheme = s
+ if not scheme:
+ scheme = request.base[:request.base.find("://")]
+
+ if local:
+ lbase = request.headers.get(local, None)
+ if debug:
+ cherrypy.log('Testing local %r:%r' % (local, lbase), 'TOOLS.PROXY')
+ if lbase is not None:
+ base = lbase.split(',')[0]
+ if not base:
+ port = request.local.port
+ if port == 80:
+ base = '127.0.0.1'
+ else:
+ base = '127.0.0.1:%s' % port
+
+ if base.find("://") == -1:
+ # add http:// or https:// if needed
+ base = scheme + "://" + base
+
+ request.base = base
+
+ if remote:
+ xff = request.headers.get(remote)
+ if debug:
+ cherrypy.log('Testing remote %r:%r' % (remote, xff), 'TOOLS.PROXY')
+ if xff:
+ if remote == 'X-Forwarded-For':
+ # See http://bob.pythonmac.org/archives/2005/09/23/apache-x-forwarded-for-caveat/
+ xff = xff.split(',')[-1].strip()
+ request.remote.ip = xff
+
+
+def ignore_headers(headers=('Range',), debug=False):
+ """Delete request headers whose field names are included in 'headers'.
+
+ This is a useful tool for working behind certain HTTP servers;
+ for example, Apache duplicates the work that CP does for 'Range'
+ headers, and will doubly-truncate the response.
+ """
+ request = cherrypy.serving.request
+ for name in headers:
+ if name in request.headers:
+ if debug:
+ cherrypy.log('Ignoring request header %r' % name,
+ 'TOOLS.IGNORE_HEADERS')
+ del request.headers[name]
+
+
+def response_headers(headers=None, debug=False):
+ """Set headers on the response."""
+ if debug:
+ cherrypy.log('Setting response headers: %s' % repr(headers),
+ 'TOOLS.RESPONSE_HEADERS')
+ for name, value in (headers or []):
+ cherrypy.serving.response.headers[name] = value
+response_headers.failsafe = True
+
+
+def referer(pattern, accept=True, accept_missing=False, error=403,
+ message='Forbidden Referer header.', debug=False):
+ """Raise HTTPError if Referer header does/does not match the given pattern.
+
+ pattern
+ A regular expression pattern to test against the Referer.
+
+ accept
+ If True, the Referer must match the pattern; if False,
+ the Referer must NOT match the pattern.
+
+ accept_missing
+ If True, permit requests with no Referer header.
+
+ error
+ The HTTP error code to return to the client on failure.
+
+ message
+ A string to include in the response body on failure.
+
+ """
+ try:
+ ref = cherrypy.serving.request.headers['Referer']
+ match = bool(re.match(pattern, ref))
+ if debug:
+ cherrypy.log('Referer %r matches %r' % (ref, pattern),
+ 'TOOLS.REFERER')
+ if accept == match:
+ return
+ except KeyError:
+ if debug:
+ cherrypy.log('No Referer header', 'TOOLS.REFERER')
+ if accept_missing:
+ return
+
+ raise cherrypy.HTTPError(error, message)
+
+
+class SessionAuth(object):
+ """Assert that the user is logged in."""
+
+ session_key = "username"
+ debug = False
+
+ def check_username_and_password(self, username, password):
+ pass
+
+ def anonymous(self):
+ """Provide a temporary user name for anonymous users."""
+ pass
+
+ def on_login(self, username):
+ pass
+
+ def on_logout(self, username):
+ pass
+
+ def on_check(self, username):
+ pass
+
+ def login_screen(self, from_page='..', username='', error_msg='', **kwargs):
+ return ntob("""<html><body>
+Message: %(error_msg)s
+<form method="post" action="do_login">
+ Login: <input type="text" name="username" value="%(username)s" size="10" /><br />
+ Password: <input type="password" name="password" size="10" /><br />
+ <input type="hidden" name="from_page" value="%(from_page)s" /><br />
+ <input type="submit" />
+</form>
+</body></html>""" % {'from_page': from_page, 'username': username,
+ 'error_msg': error_msg}, "utf-8")
+
+ def do_login(self, username, password, from_page='..', **kwargs):
+ """Login. May raise redirect, or return True if request handled."""
+ response = cherrypy.serving.response
+ error_msg = self.check_username_and_password(username, password)
+ if error_msg:
+ body = self.login_screen(from_page, username, error_msg)
+ response.body = body
+ if "Content-Length" in response.headers:
+ # Delete Content-Length header so finalize() recalcs it.
+ del response.headers["Content-Length"]
+ return True
+ else:
+ cherrypy.serving.request.login = username
+ cherrypy.session[self.session_key] = username
+ self.on_login(username)
+ raise cherrypy.HTTPRedirect(from_page or "/")
+
+ def do_logout(self, from_page='..', **kwargs):
+ """Logout. May raise redirect, or return True if request handled."""
+ sess = cherrypy.session
+ username = sess.get(self.session_key)
+ sess[self.session_key] = None
+ if username:
+ cherrypy.serving.request.login = None
+ self.on_logout(username)
+ raise cherrypy.HTTPRedirect(from_page)
+
+ def do_check(self):
+ """Assert username. May raise redirect, or return True if request handled."""
+ sess = cherrypy.session
+ request = cherrypy.serving.request
+ response = cherrypy.serving.response
+
+ username = sess.get(self.session_key)
+ if not username:
+ sess[self.session_key] = username = self.anonymous()
+ if self.debug:
+ cherrypy.log('No session[username], trying anonymous', 'TOOLS.SESSAUTH')
+ if not username:
+ url = cherrypy.url(qs=request.query_string)
+ if self.debug:
+ cherrypy.log('No username, routing to login_screen with '
+ 'from_page %r' % url, 'TOOLS.SESSAUTH')
+ response.body = self.login_screen(url)
+ if "Content-Length" in response.headers:
+ # Delete Content-Length header so finalize() recalcs it.
+ del response.headers["Content-Length"]
+ return True
+ if self.debug:
+ cherrypy.log('Setting request.login to %r' % username, 'TOOLS.SESSAUTH')
+ request.login = username
+ self.on_check(username)
+
+ def run(self):
+ request = cherrypy.serving.request
+ response = cherrypy.serving.response
+
+ path = request.path_info
+ if path.endswith('login_screen'):
+ if self.debug:
+ cherrypy.log('routing %r to login_screen' % path, 'TOOLS.SESSAUTH')
+ return self.login_screen(**request.params)
+ elif path.endswith('do_login'):
+ if request.method != 'POST':
+ response.headers['Allow'] = "POST"
+ if self.debug:
+ cherrypy.log('do_login requires POST', 'TOOLS.SESSAUTH')
+ raise cherrypy.HTTPError(405)
+ if self.debug:
+ cherrypy.log('routing %r to do_login' % path, 'TOOLS.SESSAUTH')
+ return self.do_login(**request.params)
+ elif path.endswith('do_logout'):
+ if request.method != 'POST':
+ response.headers['Allow'] = "POST"
+ raise cherrypy.HTTPError(405)
+ if self.debug:
+ cherrypy.log('routing %r to do_logout' % path, 'TOOLS.SESSAUTH')
+ return self.do_logout(**request.params)
+ else:
+ if self.debug:
+ cherrypy.log('No special path, running do_check', 'TOOLS.SESSAUTH')
+ return self.do_check()
+
+
+def session_auth(**kwargs):
+ sa = SessionAuth()
+ for k, v in kwargs.items():
+ setattr(sa, k, v)
+ return sa.run()
+session_auth.__doc__ = """Session authentication hook.
+
+Any attribute of the SessionAuth class may be overridden via a keyword arg
+to this function:
+
+""" + "\n".join(["%s: %s" % (k, type(getattr(SessionAuth, k)).__name__)
+ for k in dir(SessionAuth) if not k.startswith("__")])
+
+
+def log_traceback(severity=logging.ERROR, debug=False):
+ """Write the last error's traceback to the cherrypy error log."""
+ cherrypy.log("", "HTTP", severity=severity, traceback=True)
+
+def log_request_headers(debug=False):
+ """Write request headers to the cherrypy error log."""
+ h = [" %s: %s" % (k, v) for k, v in cherrypy.serving.request.header_list]
+ cherrypy.log('\nRequest Headers:\n' + '\n'.join(h), "HTTP")
+
+def log_hooks(debug=False):
+ """Write request.hooks to the cherrypy error log."""
+ request = cherrypy.serving.request
+
+ msg = []
+ # Sort by the standard points if possible.
+ from cherrypy import _cprequest
+ points = _cprequest.hookpoints
+ for k in request.hooks.keys():
+ if k not in points:
+ points.append(k)
+
+ for k in points:
+ msg.append(" %s:" % k)
+ v = request.hooks.get(k, [])
+ v.sort()
+ for h in v:
+ msg.append(" %r" % h)
+ cherrypy.log('\nRequest Hooks for ' + cherrypy.url() +
+ ':\n' + '\n'.join(msg), "HTTP")
+
+def redirect(url='', internal=True, debug=False):
+ """Raise InternalRedirect or HTTPRedirect to the given url."""
+ if debug:
+ cherrypy.log('Redirecting %sto: %s' %
+ ({True: 'internal ', False: ''}[internal], url),
+ 'TOOLS.REDIRECT')
+ if internal:
+ raise cherrypy.InternalRedirect(url)
+ else:
+ raise cherrypy.HTTPRedirect(url)
+
+def trailing_slash(missing=True, extra=False, status=None, debug=False):
+ """Redirect if path_info has (missing|extra) trailing slash."""
+ request = cherrypy.serving.request
+ pi = request.path_info
+
+ if debug:
+ cherrypy.log('is_index: %r, missing: %r, extra: %r, path_info: %r' %
+ (request.is_index, missing, extra, pi),
+ 'TOOLS.TRAILING_SLASH')
+ if request.is_index is True:
+ if missing:
+ if not pi.endswith('/'):
+ new_url = cherrypy.url(pi + '/', request.query_string)
+ raise cherrypy.HTTPRedirect(new_url, status=status or 301)
+ elif request.is_index is False:
+ if extra:
+ # If pi == '/', don't redirect to ''!
+ if pi.endswith('/') and pi != '/':
+ new_url = cherrypy.url(pi[:-1], request.query_string)
+ raise cherrypy.HTTPRedirect(new_url, status=status or 301)
+
+def flatten(debug=False):
+ """Wrap response.body in a generator that recursively iterates over body.
+
+ This allows cherrypy.response.body to consist of 'nested generators';
+ that is, a set of generators that yield generators.
+ """
+ import types
+ def flattener(input):
+ numchunks = 0
+ for x in input:
+ if not isinstance(x, types.GeneratorType):
+ numchunks += 1
+ yield x
+ else:
+ for y in flattener(x):
+ numchunks += 1
+ yield y
+ if debug:
+ cherrypy.log('Flattened %d chunks' % numchunks, 'TOOLS.FLATTEN')
+ response = cherrypy.serving.response
+ response.body = flattener(response.body)
+
+
+def accept(media=None, debug=False):
+ """Return the client's preferred media-type (from the given Content-Types).
+
+ If 'media' is None (the default), no test will be performed.
+
+ If 'media' is provided, it should be the Content-Type value (as a string)
+ or values (as a list or tuple of strings) which the current resource
+ can emit. The client's acceptable media ranges (as declared in the
+ Accept request header) will be matched in order to these Content-Type
+ values; the first such string is returned. That is, the return value
+ will always be one of the strings provided in the 'media' arg (or None
+ if 'media' is None).
+
+ If no match is found, then HTTPError 406 (Not Acceptable) is raised.
+ Note that most web browsers send */* as a (low-quality) acceptable
+ media range, which should match any Content-Type. In addition, "...if
+ no Accept header field is present, then it is assumed that the client
+ accepts all media types."
+
+ Matching types are checked in order of client preference first,
+ and then in the order of the given 'media' values.
+
+ Note that this function does not honor accept-params (other than "q").
+ """
+ if not media:
+ return
+ if isinstance(media, basestring):
+ media = [media]
+ request = cherrypy.serving.request
+
+ # Parse the Accept request header, and try to match one
+ # of the requested media-ranges (in order of preference).
+ ranges = request.headers.elements('Accept')
+ if not ranges:
+ # Any media type is acceptable.
+ if debug:
+ cherrypy.log('No Accept header elements', 'TOOLS.ACCEPT')
+ return media[0]
+ else:
+ # Note that 'ranges' is sorted in order of preference
+ for element in ranges:
+ if element.qvalue > 0:
+ if element.value == "*/*":
+ # Matches any type or subtype
+ if debug:
+ cherrypy.log('Match due to */*', 'TOOLS.ACCEPT')
+ return media[0]
+ elif element.value.endswith("/*"):
+ # Matches any subtype
+ mtype = element.value[:-1] # Keep the slash
+ for m in media:
+ if m.startswith(mtype):
+ if debug:
+ cherrypy.log('Match due to %s' % element.value,
+ 'TOOLS.ACCEPT')
+ return m
+ else:
+ # Matches exact value
+ if element.value in media:
+ if debug:
+ cherrypy.log('Match due to %s' % element.value,
+ 'TOOLS.ACCEPT')
+ return element.value
+
+ # No suitable media-range found.
+ ah = request.headers.get('Accept')
+ if ah is None:
+ msg = "Your client did not send an Accept header."
+ else:
+ msg = "Your client sent this Accept header: %s." % ah
+ msg += (" But this resource only emits these media types: %s." %
+ ", ".join(media))
+ raise cherrypy.HTTPError(406, msg)
+
+
+class MonitoredHeaderMap(_httputil.HeaderMap):
+
+ def __init__(self):
+ self.accessed_headers = set()
+
+ def __getitem__(self, key):
+ self.accessed_headers.add(key)
+ return _httputil.HeaderMap.__getitem__(self, key)
+
+ def __contains__(self, key):
+ self.accessed_headers.add(key)
+ return _httputil.HeaderMap.__contains__(self, key)
+
+ def get(self, key, default=None):
+ self.accessed_headers.add(key)
+ return _httputil.HeaderMap.get(self, key, default=default)
+
+ def has_key(self, key):
+ self.accessed_headers.add(key)
+ return _httputil.HeaderMap.has_key(self, key)
+
+
+def autovary(ignore=None, debug=False):
+ """Auto-populate the Vary response header based on request.header access."""
+ request = cherrypy.serving.request
+
+ req_h = request.headers
+ request.headers = MonitoredHeaderMap()
+ request.headers.update(req_h)
+ if ignore is None:
+ ignore = set(['Content-Disposition', 'Content-Length', 'Content-Type'])
+
+ def set_response_header():
+ resp_h = cherrypy.serving.response.headers
+ v = set([e.value for e in resp_h.elements('Vary')])
+ if debug:
+ cherrypy.log('Accessed headers: %s' % request.headers.accessed_headers,
+ 'TOOLS.AUTOVARY')
+ v = v.union(request.headers.accessed_headers)
+ v = v.difference(ignore)
+ v = list(v)
+ v.sort()
+ resp_h['Vary'] = ', '.join(v)
+ request.hooks.attach('before_finalize', set_response_header, 95)
+