diff options
Diffstat (limited to 'websdk/hatta/wiki.py')
-rw-r--r-- | websdk/hatta/wiki.py | 954 |
1 files changed, 954 insertions, 0 deletions
diff --git a/websdk/hatta/wiki.py b/websdk/hatta/wiki.py new file mode 100644 index 0000000..7eb961c --- /dev/null +++ b/websdk/hatta/wiki.py @@ -0,0 +1,954 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import gettext +import os +import sys +import re +import tempfile +import itertools + +import werkzeug +import werkzeug.routing +import jinja2 + +pygments = None +try: + import pygments +except ImportError: + pass + +import hatta +import storage +import search +import page +import parser +import error +import data + +import mercurial # import it after storage! + + +class WikiResponse(werkzeug.BaseResponse, werkzeug.ETagResponseMixin, + werkzeug.CommonResponseDescriptorsMixin): + """A typical HTTP response class made out of Werkzeug's mixins.""" + + def make_conditional(self, request): + ret = super(WikiResponse, self).make_conditional(request) + # Remove all headers if it's 304, according to + # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.3.5 + if self.status.startswith('304'): + self.response = [] + try: + del self.content_type + except (AttributeError, KeyError, IndexError): + pass + try: + del self.content_length + except (AttributeError, KeyError, IndexError): + pass + try: + del self.headers['Content-length'] + except (AttributeError, KeyError, IndexError): + pass + try: + del self.headers['Content-type'] + except (AttributeError, KeyError, IndexError): + pass + return ret + + +class WikiTempFile(object): + """Wrap a file for uploading content.""" + + def __init__(self, tmppath): + self.tmppath = tempfile.mkdtemp(dir=tmppath) + self.tmpname = os.path.join(self.tmppath, 'saved') + self.f = open(self.tmpname, "wb") + + def read(self, *args, **kw): + return self.f.read(*args, **kw) + + def readlines(self, *args, **kw): + return self.f.readlines(*args, **kw) + + def write(self, *args, **kw): + return self.f.write(*args, **kw) + + def seek(self, *args, **kw): + return self.f.seek(*args, **kw) + + def truncate(self, *args, **kw): + return self.f.truncate(*args, **kw) + + def close(self, *args, **kw): + ret = self.f.close(*args, **kw) + try: + os.unlink(self.tmpname) + except OSError: + pass + try: + os.rmdir(self.tmppath) + except OSError: + pass + return ret + + +class WikiRequest(werkzeug.BaseRequest, werkzeug.ETagRequestMixin): + """ + A Werkzeug's request with additional functions for handling file + uploads and wiki-specific link generation. + """ + + charset = 'utf-8' + encoding_errors = 'ignore' + + def __init__(self, wiki, adapter, environ, **kw): + werkzeug.BaseRequest.__init__(self, environ, shallow=False, **kw) + self.wiki = wiki + self.adapter = adapter + self.tmpfiles = [] + self.tmppath = wiki.path + + def get_url(self, title=None, view=None, method='GET', + external=False, **kw): + if view is None: + view = self.wiki.view + if title is not None: + kw['title'] = title.strip() + return self.adapter.build(view, kw, method=method, + force_external=external) + + def get_download_url(self, title): + return self.get_url(title, view=self.wiki.download) + + def get_author(self): + """Try to guess the author name. Use IP address as last resort.""" + + try: + cookie = werkzeug.url_unquote(self.cookies.get("author", "")) + except UnicodeError: + cookie = None + try: + auth = werkzeug.url_unquote(self.environ.get('REMOTE_USER', "")) + except UnicodeError: + auth = None + author = (self.form.get("author") or cookie or auth or + self.remote_addr) + return author + + def _get_file_stream(self, total_content_length=None, content_type=None, + filename=None, content_length=None): + """Save all the POSTs to temporary files.""" + + temp_file = WikiTempFile(self.tmppath) + self.tmpfiles.append(temp_file) + return temp_file + + def cleanup(self): + """Clean up the temporary files created by POSTs.""" + + for temp_file in self.tmpfiles: + temp_file.close() + self.tmpfiles = [] + + +class WikiTitleConverter(werkzeug.routing.PathConverter): + """Behaves like the path converter, but doesn't match the "+ pages".""" + + def to_url(self, value): + return werkzeug.url_quote(value.strip(), self.map.charset, safe="/") + + regex = '([^+%]|%[^2]|%2[^Bb]).*' + + +class WikiAllConverter(werkzeug.routing.BaseConverter): + """Matches everything.""" + + regex = '.*' + + +class URL(object): + """A decorator for marking methods as endpoints for URLs.""" + + urls = [] + + def __init__(self, url, methods=None): + """Create a decorator with specified parameters.""" + + self.url = url + self.methods = methods or ['GET', 'HEAD'] + + def __call__(self, func): + """The actual decorator only records the data.""" + + self.urls.append((func.__name__, self.url, self.methods)) + return func + + @classmethod + def rules(cls, app): + """Returns the routing rules, using app's bound methods.""" + + for name, url, methods in cls.urls: + func = getattr(app, name, None) + if not callable(func): + continue + yield werkzeug.routing.Rule(url, endpoint=func, methods=methods) + + +class Wiki(object): + """ + The main class of the wiki, handling initialization of the whole + application and most of the logic. + """ + storage_class = storage.WikiStorage + index_class = search.WikiSearch + filename_map = page.filename_map + mime_map = page.mime_map + icon = data.icon + scripts = data.scripts + style = data.style + + def __init__(self, config): + if config.get_bool('show_version', False): + sys.stdout.write("Hatta %s\n" % hatta.__version__) + sys.exit() + self.dead = False + self.config = config + + self.language = config.get('language', None) + if self.language is not None: + try: + translation = gettext.translation('hatta', 'locale', + languages=[self.language]) + + except IOError: + translation = gettext.translation('hatta', fallback=True, + languages=[self.language]) + else: + translation = gettext.translation('hatta', fallback=True) + self.gettext = translation.ugettext + self.template_env = jinja2.Environment( + extensions=['jinja2.ext.i18n'], + loader=jinja2.PackageLoader('hatta', 'templates'), + ) + self.template_env.autoescape = True + self.template_env.install_gettext_translations(translation, True) + self.path = os.path.abspath(config.get('pages_path', 'docs')) + self.page_charset = config.get('page_charset', 'utf-8') + self.menu_page = self.config.get('menu_page', u'Menu') + self.front_page = self.config.get('front_page', u'Home') + self.logo_page = self.config.get('logo_page', u'logo.png') + self.locked_page = self.config.get('locked_page', u'Locked') + self.site_name = self.config.get('site_name', u'Hatta Wiki') + self.read_only = self.config.get_bool('read_only', False) + self.icon_page = self.config.get('icon_page', None) + self.alias_page = self.config.get('alias_page', 'Alias') + self.pygments_style = self.config.get('pygments_style', 'tango') + self.subdirectories = self.config.get_bool('subdirectories', False) + self.extension = self.config.get('extension', None) + self.unix_eol = self.config.get_bool('unix_eol', False) + if self.subdirectories: + self.storage = storage.WikiSubdirectoryStorage(self.path, + self.page_charset, + self.gettext, + self.unix_eol, + self.extension) + else: + self.storage = self.storage_class(self.path, self.page_charset, + self.gettext, self.unix_eol, + self.extension) + self.cache = os.path.abspath(config.get('cache_path', + os.path.join(self.storage.repo_path, + '.hg', 'hatta', 'cache'))) + self.index = self.index_class(self.cache, self.language, self.storage) + self.index.update(self) + self.url_rules = URL.rules(self) + self.url_map = werkzeug.routing.Map(self.url_rules, converters={ + 'title': WikiTitleConverter, + 'all': WikiAllConverter, + }) + + def add_url_rule(self, rule): + """Let plugins add additional url rules.""" + + self.url_rules.append(rule) + self.url_map = werkzeug.routing.Map(self.url_rules, converters={ + 'title': WikiTitleConverter, + 'all': WikiAllConverter, + }) + + def get_page(self, request, title): + """Creates a page object based on page's mime type""" + + if title: + try: + page_class, mime = self.filename_map[title] + except KeyError: + mime = page.page_mime(title) + major, minor = mime.split('/', 1) + try: + page_class = self.mime_map[mime] + except KeyError: + try: + plus_pos = minor.find('+') + if plus_pos > 0: + minor_base = minor[plus_pos:] + else: + minor_base = '' + base_mime = '/'.join([major, minor_base]) + page_class = self.mime_map[base_mime] + except KeyError: + try: + page_class = self.mime_map[major] + except KeyError: + page_class = self.mime_map[''] + else: + page_class = page.WikiPageSpecial + mime = '' + return page_class(self, request, title, mime) + + def response(self, request, title, content, etag='', mime='text/html', + rev=None, size=None): + """Create a WikiResponse for a page.""" + + response = WikiResponse(content, mimetype=mime) + if rev is None: + inode, _size, mtime = self.storage.page_file_meta(title) + response.set_etag(u'%s/%s/%d-%d' % (etag, + werkzeug.url_quote(title), + inode, mtime)) + if size == -1: + size = _size + else: + response.set_etag(u'%s/%s/%s' % (etag, werkzeug.url_quote(title), + rev)) + if size: + response.content_length = size + response.make_conditional(request) + return response + + def _check_lock(self, title): + _ = self.gettext + restricted_pages = [ + 'scripts.js', + 'robots.txt', + ] + if self.read_only: + raise error.ForbiddenErr(_(u"This site is read-only.")) + if title in restricted_pages: + raise error.ForbiddenErr(_(u"""Can't edit this page. +It can only be edited by the site admin directly on the disk.""")) + if title in self.index.page_links(self.locked_page): + raise error.ForbiddenErr(_(u"This page is locked.")) + + def _serve_default(self, request, title, content, mime): + """Some pages have their default content.""" + + if title in self.storage: + return self.download(request, title) + response = WikiResponse(content, mimetype=mime) + response.set_etag('/%s/-1' % title) + response.make_conditional(request) + return response + + @URL('/<title:title>') + @URL('/') + def view(self, request, title=None): + if title is None: + title = self.front_page + page = self.get_page(request, title) + try: + content = page.view_content() + except error.NotFoundErr: + url = request.get_url(title, self.edit, external=True) + return werkzeug.routing.redirect(url, code=303) + html = page.template("page.html", content=content) + dependencies = page.dependencies() + etag = '/(%s)' % u','.join(dependencies) + return self.response(request, title, html, etag=etag) + + @URL('/+history/<title:title>/<int:rev>') + def revision(self, request, title, rev): + _ = self.gettext + text = self.storage.revision_text(title, rev) + link = werkzeug.html.a(werkzeug.html(title), + href=request.get_url(title)) + content = [ + werkzeug.html.p( + werkzeug.html( + _(u'Content of revision %(rev)d of page %(title)s:')) + % {'rev': rev, 'title': link}), + werkzeug.html.pre(werkzeug.html(text)), + ] + special_title = _(u'Revision of "%(title)s"') % {'title': title} + page = self.get_page(request, title) + html = page.template('page_special.html', content=content, + special_title=special_title) + response = self.response(request, title, html, rev=rev, etag='/old') + return response + + @URL('/+version/') + @URL('/+version/<title:title>') + def version(self, request, title=None): + if title is None: + version = self.storage.repo_revision() + else: + try: + version, x, x, x = self.storage.page_history(title).next() + except StopIteration: + version = 0 + return WikiResponse('%d' % version, mimetype="text/plain") + + @URL('/+edit/<title:title>', methods=['POST']) + def save(self, request, title): + _ = self.gettext + self._check_lock(title) + url = request.get_url(title) + if request.form.get('cancel'): + if title not in self.storage: + url = request.get_url(self.front_page) + if request.form.get('preview'): + text = request.form.get("text") + if text is not None: + lines = text.split('\n') + else: + lines = [werkzeug.html.p(werkzeug.html( + _(u'No preview for binaries.')))] + return self.edit(request, title, preview=lines) + elif request.form.get('save'): + comment = request.form.get("comment", "") + author = request.get_author() + text = request.form.get("text") + try: + parent = int(request.form.get("parent")) + except (ValueError, TypeError): + parent = None + self.storage.reopen() + self.index.update(self) + page = self.get_page(request, title) + if text is not None: + if title == self.locked_page: + for link, label in page.extract_links(text): + if title == link: + raise error.ForbiddenErr( + _(u"This page is locked.")) + if u'href="' in comment or u'http:' in comment: + raise error.ForbiddenErr() + if text.strip() == '': + self.storage.delete_page(title, author, comment) + url = request.get_url(self.front_page) + else: + self.storage.save_text(title, text, author, comment, + parent) + else: + text = u'' + upload = request.files['data'] + f = upload.stream + if f is not None and upload.filename is not None: + try: + self.storage.save_file(title, f.tmpname, author, + comment, parent) + except AttributeError: + self.storage.save_data(title, f.read(), author, + comment, parent) + else: + self.storage.delete_page(title, author, comment) + url = request.get_url(self.front_page) + self.index.update_page(page, title, text=text) + response = werkzeug.routing.redirect(url, code=303) + response.set_cookie('author', + werkzeug.url_quote(request.get_author()), + max_age=604800) + return response + + @URL('/+edit/<title:title>', methods=['GET']) + def edit(self, request, title, preview=None): + self._check_lock(title) + exists = title in self.storage + if exists: + self.storage.reopen() + page = self.get_page(request, title) + html = page.render_editor(preview) + if not exists: + response = WikiResponse(html, mimetype="text/html", + status='404 Not found') + + elif preview: + response = WikiResponse(html, mimetype="text/html") + else: + response = self.response(request, title, html, '/edit') + response.headers.add('Cache-Control', 'no-cache') + return response + + @URL('/+feed/atom') + @URL('/+feed/rss') + def atom(self, request): + _ = self.gettext + feed = werkzeug.contrib.atom.AtomFeed(self.site_name, + feed_url=request.url, + url=request.adapter.build(self.view, force_external=True), + subtitle=_(u'Track the most recent changes to the wiki ' + u'in this feed.')) + history = itertools.islice(self.storage.history(), None, 10, None) + unique_titles = set() + for title, rev, date, author, comment in history: + if title in unique_titles: + continue + unique_titles.add(title) + if rev > 0: + url = request.adapter.build(self.diff, { + 'title': title, + 'from_rev': rev - 1, + 'to_rev': rev, + }, force_external=True) + else: + url = request.adapter.build(self.revision, { + 'title': title, + 'rev': rev, + }, force_external=True) + feed.add(title, comment, content_type="text", author=author, + url=url, updated=date) + rev = self.storage.repo_revision() + response = self.response(request, 'atom', feed.generate(), '/+feed', + 'application/xml', rev) + response.make_conditional(request) + return response + + @URL('/+download/<title:title>') + def download(self, request, title): + """Serve the raw content of a page directly from disk.""" + + mime = page.page_mime(title) + if mime == 'text/x-wiki': + mime = 'text/plain' + try: + wrap_file = werkzeug.wrap_file + except AttributeError: + wrap_file = lambda x, y: y + f = wrap_file(request.environ, self.storage.open_page(title)) + response = self.response(request, title, f, '/download', mime, size=-1) + response.direct_passthrough = True + return response + + @URL('/+render/<title:title>') + def render(self, request, title): + """Serve a thumbnail or otherwise rendered content.""" + + def file_time_and_size(file_path): + """Get file's modification timestamp and its size.""" + + try: + (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, + st_atime, st_mtime, st_ctime) = os.stat(file_path) + except OSError: + st_mtime = 0 + st_size = None + return st_mtime, st_size + + def rm_temp_dir(dir_path): + """Delete the directory with subdirectories.""" + + for root, dirs, files in os.walk(dir_path, topdown=False): + for name in files: + try: + os.remove(os.path.join(root, name)) + except OSError: + pass + for name in dirs: + try: + os.rmdir(os.path.join(root, name)) + except OSError: + pass + try: + os.rmdir(dir_path) + except OSError: + pass + + page = self.get_page(request, title) + try: + cache_filename, cache_mime = page.render_mime() + render = page.render_cache + except (AttributeError, NotImplementedError): + return self.download(request, title) + + cache_dir = os.path.join(self.cache, 'render', + werkzeug.url_quote(title, safe='')) + cache_file = os.path.join(cache_dir, cache_filename) + page_inode, page_size, page_mtime = self.storage.page_file_meta(title) + cache_mtime, cache_size = file_time_and_size(cache_file) + if page_mtime > cache_mtime: + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + try: + temp_dir = tempfile.mkdtemp(dir=cache_dir) + result_file = render(temp_dir) + mercurial.util.rename(result_file, cache_file) + finally: + rm_temp_dir(temp_dir) + try: + wrap_file = werkzeug.wrap_file + except AttributeError: + wrap_file = lambda x, y: y + f = wrap_file(request.environ, open(cache_file)) + response = self.response(request, title, f, '/render', cache_mime, + size=cache_size) + response.direct_passthrough = True + return response + + @URL('/+undo/<title:title>', methods=['POST']) + def undo(self, request, title): + """Revert a change to a page.""" + + _ = self.gettext + self._check_lock(title) + rev = None + for key in request.form: + try: + rev = int(key) + except ValueError: + pass + author = request.get_author() + if rev is not None: + try: + parent = int(request.form.get("parent")) + except (ValueError, TypeError): + parent = None + self.storage.reopen() + self.index.update(self) + if rev == 0: + comment = _(u'Delete page %(title)s') % {'title': title} + data = '' + self.storage.delete_page(title, author, comment) + else: + comment = _(u'Undo of change %(rev)d of page %(title)s') % { + 'rev': rev, 'title': title} + data = self.storage.page_revision(title, rev - 1) + self.storage.save_data(title, data, author, comment, parent) + page = self.get_page(request, title) + self.index.update_page(page, title, data=data) + url = request.adapter.build(self.history, {'title': title}, + method='GET', force_external=True) + return werkzeug.redirect(url, 303) + + @URL('/+history/<title:title>') + def history(self, request, title): + """Display history of changes of a page.""" + + max_rev = -1 + history = [] + page = self.get_page(request, title) + for rev, date, author, comment in self.storage.page_history(title): + if max_rev < rev: + max_rev = rev + if rev > 0: + date_url = request.adapter.build(self.diff, { + 'title': title, 'from_rev': rev - 1, 'to_rev': rev}) + else: + date_url = request.adapter.build(self.revision, { + 'title': title, 'rev': rev}) + history.append((date, date_url, rev, author, comment)) + html = page.template('history.html', history=history, + date_html=hatta.page.date_html, parent=max_rev) + response = self.response(request, title, html, '/history') + return response + + @URL('/+history/') + def recent_changes(self, request): + """Serve the recent changes page.""" + + def _changes_list(): + last = {} + lastrev = {} + count = 0 + for title, rev, date, author, comment in self.storage.history(): + if (author, comment) == last.get(title, (None, None)): + continue + count += 1 + if count > 100: + break + if rev > 0: + date_url = request.adapter.build(self.diff, { + 'title': title, + 'from_rev': rev - 1, + 'to_rev': lastrev.get(title, rev), + }) + elif rev == 0: + date_url = request.adapter.build(self.revision, { + 'title': title, 'rev': rev}) + else: + date_url = request.adapter.build(self.history, { + 'title': title}) + last[title] = author, comment + lastrev[title] = rev + + yield date, date_url, title, author, comment + + page = self.get_page(request, '') + html = page.template('changes.html', changes=_changes_list(), + date_html=hatta.page.date_html) + response = WikiResponse(html, mimetype='text/html') + response.set_etag('/history/%d' % self.storage.repo_revision()) + response.make_conditional(request) + return response + + @URL('/+history/<title:title>/<int:from_rev>:<int:to_rev>') + def diff(self, request, title, from_rev, to_rev): + """Show the differences between specified revisions.""" + + _ = self.gettext + page = self.get_page(request, title) + build = request.adapter.build + from_url = build(self.revision, {'title': title, 'rev': from_rev}) + to_url = build(self.revision, {'title': title, 'rev': to_rev}) + a = werkzeug.html.a + links = { + 'link1': a(str(from_rev), href=from_url), + 'link2': a(str(to_rev), href=to_url), + 'link': a(werkzeug.html(title), href=request.get_url(title)), + } + message = werkzeug.html(_( + u'Differences between revisions %(link1)s and %(link2)s ' + u'of page %(link)s.')) % links + diff_content = getattr(page, 'diff_content', None) + if diff_content: + from_text = self.storage.revision_text(page.title, from_rev) + to_text = self.storage.revision_text(page.title, to_rev) + content = page.diff_content(from_text, to_text, message) + else: + content = [werkzeug.html.p(werkzeug.html( + _(u"Diff not available for this kind of pages.")))] + special_title = _(u'Diff for "%(title)s"') % {'title': title} + html = page.template('page_special.html', content=content, + special_title=special_title) + response = WikiResponse(html, mimetype='text/html') + return response + + @URL('/+index') + def all_pages(self, request): + """Show index of all pages in the wiki.""" + + _ = self.gettext + page = self.get_page(request, '') + html = page.template('list.html', + pages=sorted(self.storage.all_pages()), + class_='index', + message=_(u'Index of all pages'), + special_title=_(u'Page Index')) + response = WikiResponse(html, mimetype='text/html') + response.set_etag('/+index/%d' % self.storage.repo_revision()) + response.make_conditional(request) + return response + + @URL('/+orphaned') + def orphaned(self, request): + """Show all pages that don't have backlinks.""" + + _ = self.gettext + page = self.get_page(request, '') + html = page.template('list.html', + pages=self.index.orphaned_pages(), + class_='orphaned', + message=_(u'List of pages with no links to them'), + special_title=_(u'Orphaned pages')) + response = WikiResponse(html, mimetype='text/html') + response.set_etag('/+orphaned/%d' % self.storage.repo_revision()) + response.make_conditional(request) + return response + + @URL('/+wanted') + def wanted(self, request): + """Show all pages that don't exist yet, but are linked.""" + + def _wanted_pages_list(): + for refs, title in self.index.wanted_pages(): + if not (parser.external_link(title) or title.startswith('+') + or title.startswith(':')): + yield refs, title + + page = self.get_page(request, '') + html = page.template('wanted.html', pages=_wanted_pages_list()) + response = WikiResponse(html, mimetype='text/html') + response.set_etag('/+wanted/%d' % self.storage.repo_revision()) + response.make_conditional(request) + return response + + @URL('/+search', methods=['GET', 'POST']) + def search(self, request): + """Serve the search results page.""" + + _ = self.gettext + + def search_snippet(title, words): + """Extract a snippet of text for search results.""" + + try: + text = self.storage.page_text(title) + except error.NotFoundErr: + return u'' + regexp = re.compile(u"|".join(re.escape(w) for w in words), + re.U | re.I) + match = regexp.search(text) + if match is None: + return u"" + position = match.start() + min_pos = max(position - 60, 0) + max_pos = min(position + 60, len(text)) + snippet = werkzeug.escape(text[min_pos:max_pos]) + highlighted = werkzeug.html.b(match.group(0), class_="highlight") + html = regexp.sub(highlighted, snippet) + return html + + def page_search(words, page, request): + """Display the search results.""" + + h = werkzeug.html + self.storage.reopen() + self.index.update(self) + result = sorted(self.index.find(words), key=lambda x: -x[0]) + yield werkzeug.html.p(h(_(u'%d page(s) containing all words:') + % len(result))) + yield u'<ol class="search">' + for number, (score, title) in enumerate(result): + yield h.li(h.b(page.wiki_link(title)), u' ', h.i(str(score)), + h.div(search_snippet(title, words), + _class="snippet"), + id_="search-%d" % (number + 1)) + yield u'</ol>' + + query = request.values.get('q', u'').strip() + page = self.get_page(request, '') + if not query: + url = request.get_url(view=self.all_pages, external=True) + return werkzeug.routing.redirect(url, code=303) + words = tuple(self.index.split_text(query)) + if not words: + words = (query,) + title = _(u'Searching for "%s"') % u" ".join(words) + content = page_search(words, page, request) + html = page.template('page_special.html', content=content, + special_title=title) + return WikiResponse(html, mimetype='text/html') + + @URL('/+search/<title:title>', methods=['GET', 'POST']) + def backlinks(self, request, title): + """Serve the page with backlinks.""" + + self.storage.reopen() + self.index.update(self) + page = self.get_page(request, title) + html = page.template('backlinks.html', + pages=self.index.page_backlinks(title)) + response = WikiResponse(html, mimetype='text/html') + response.set_etag('/+search/%d' % self.storage.repo_revision()) + response.make_conditional(request) + return response + + @URL('/+download/scripts.js') + def scripts_js(self, request): + """Server the default scripts""" + + return self._serve_default(request, 'scripts.js', self.scripts, + 'text/javascript') + + @URL('/+download/style.css') + def style_css(self, request): + """Serve the default style""" + + return self._serve_default(request, 'style.css', self.style, + 'text/css') + + @URL('/+download/pygments.css') + def pygments_css(self, request): + """Serve the default pygments style""" + + _ = self.gettext + if pygments is None: + raise error.NotImplementedErr( + _(u"Code highlighting is not available.")) + + pygments_style = self.pygments_style + if pygments_style not in pygments.styles.STYLE_MAP: + pygments_style = 'default' + formatter = pygments.formatters.HtmlFormatter(style=pygments_style) + style_defs = formatter.get_style_defs('.highlight') + return self._serve_default(request, 'pygments.css', style_defs, + 'text/css') + + @URL('/favicon.ico') + def favicon_ico(self, request): + """Serve the default favicon.""" + + return self._serve_default(request, 'favicon.ico', self.icon, + 'image/x-icon') + + @URL('/robots.txt') + def robots_txt(self, request): + """Serve the robots directives.""" + + robots = ('User-agent: *\r\n' + 'Disallow: /+*\r\n' + 'Disallow: /%2B*\r\n' + 'Disallow: /+edit\r\n' + 'Disallow: /+feed\r\n' + 'Disallow: /+history\r\n' + 'Disallow: /+search\r\n' + 'Disallow: /+hg\r\n') + return self._serve_default(request, 'robots.txt', robots, + 'text/plain') + + @URL('/+hg<all:path>', methods=['GET', 'POST', 'HEAD']) + def hgweb(self, request, path=None): + """ + Serve the pages repository on the web like a normal hg repository. + """ + + _ = self.gettext + if not self.config.get_bool('hgweb', False): + raise error.ForbiddenErr(_(u'Repository access disabled.')) + app = mercurial.hgweb.request.wsgiapplication( + lambda: mercurial.hgweb.hgweb(self.storage.repo, self.site_name)) + + def hg_app(env, start): + env = request.environ + prefix = '/+hg' + if env['PATH_INFO'].startswith(prefix): + env["PATH_INFO"] = env["PATH_INFO"][len(prefix):] + env["SCRIPT_NAME"] += prefix + return app(env, start) + return hg_app + + @URL('/shutdown', methods=['GET']) + def die(self, request): + """Terminate the standalone server if invoked from localhost.""" + _ = self.gettext + if not request.remote_addr.startswith('127.'): + raise error.ForbiddenErr( + _(u'This URL can only be called locally.')) + + if not 'werkzeug.server.shutdown' in request.environ: + raise RuntimeError('Not running the development server') + request.environ['werkzeug.server.shutdown']() + + def agony(request): + yield u'Goodbye!' + self.dead = True + + return WikiResponse(agony(request), mimetype='text/plain') + + @werkzeug.responder + def application(self, environ, start): + """The main application loop.""" + + adapter = self.url_map.bind_to_environ(environ) + request = WikiRequest(self, adapter, environ) + try: + try: + endpoint, values = adapter.match() + return endpoint(request, **values) + except werkzeug.exceptions.HTTPException, err: + return err + finally: + request.cleanup() + del request + del adapter |