# -*- coding: utf-8 -*-
#
# Copyright (C) 2006-2010 Edgewall Software
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://genshi.edgewall.org/wiki/License.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://genshi.edgewall.org/log/.
"""Markup templating engine."""
from itertools import chain
from genshi.core import Attrs, Markup, Namespace, Stream, StreamEventKind
from genshi.core import START, END, START_NS, END_NS, TEXT, PI, COMMENT
from genshi.input import XMLParser
from genshi.template.base import BadDirectiveError, Template, \
TemplateSyntaxError, _apply_directives, \
EXEC, INCLUDE, SUB
from genshi.template.eval import Suite
from genshi.template.interpolation import interpolate
from genshi.template.directives import *
from genshi.template.text import NewTextTemplate
__all__ = ['MarkupTemplate']
__docformat__ = 'restructuredtext en'
class MarkupTemplate(Template):
"""Implementation of the template language for XML-based templates.
>>> tmpl = MarkupTemplate('''
''')
>>> print(tmpl.generate(items=[1, 2, 3]))
"""
DIRECTIVE_NAMESPACE = 'http://genshi.edgewall.org/'
XINCLUDE_NAMESPACE = 'http://www.w3.org/2001/XInclude'
directives = [('def', DefDirective),
('match', MatchDirective),
('when', WhenDirective),
('otherwise', OtherwiseDirective),
('for', ForDirective),
('if', IfDirective),
('choose', ChooseDirective),
('with', WithDirective),
('replace', ReplaceDirective),
('content', ContentDirective),
('attrs', AttrsDirective),
('strip', StripDirective)]
serializer = 'xml'
_number_conv = Markup
def __init__(self, source, filepath=None, filename=None, loader=None,
encoding=None, lookup='strict', allow_exec=True):
Template.__init__(self, source, filepath=filepath, filename=filename,
loader=loader, encoding=encoding, lookup=lookup,
allow_exec=allow_exec)
self.add_directives(self.DIRECTIVE_NAMESPACE, self)
def _init_filters(self):
Template._init_filters(self)
# Make sure the include filter comes after the match filter
self.filters.remove(self._include)
self.filters += [self._match, self._include]
def _parse(self, source, encoding):
if not isinstance(source, Stream):
source = XMLParser(source, filename=self.filename,
encoding=encoding)
stream = []
for kind, data, pos in source:
if kind is TEXT:
for kind, data, pos in interpolate(data, self.filepath, pos[1],
pos[2], lookup=self.lookup):
stream.append((kind, data, pos))
elif kind is PI and data[0] == 'python':
if not self.allow_exec:
raise TemplateSyntaxError('Python code blocks not allowed',
self.filepath, *pos[1:])
try:
suite = Suite(data[1], self.filepath, pos[1],
lookup=self.lookup)
except SyntaxError, err:
raise TemplateSyntaxError(err, self.filepath,
pos[1] + (err.lineno or 1) - 1,
pos[2] + (err.offset or 0))
stream.append((EXEC, suite, pos))
elif kind is COMMENT:
if not data.lstrip().startswith('!'):
stream.append((kind, data, pos))
else:
stream.append((kind, data, pos))
return stream
def _extract_directives(self, stream, namespace, factory):
depth = 0
dirmap = {} # temporary mapping of directives to elements
new_stream = []
ns_prefix = {} # namespace prefixes in use
for kind, data, pos in stream:
if kind is START:
tag, attrs = data
directives = []
strip = False
if tag.namespace == namespace:
cls = factory.get_directive(tag.localname)
if cls is None:
raise BadDirectiveError(tag.localname,
self.filepath, pos[1])
args = dict([(name.localname, value) for name, value
in attrs if not name.namespace])
directives.append((factory.get_directive_index(cls), cls,
args, ns_prefix.copy(), pos))
strip = True
new_attrs = []
for name, value in attrs:
if name.namespace == namespace:
cls = factory.get_directive(name.localname)
if cls is None:
raise BadDirectiveError(name.localname,
self.filepath, pos[1])
if type(value) is list and len(value) == 1:
value = value[0][1]
directives.append((factory.get_directive_index(cls),
cls, value, ns_prefix.copy(), pos))
else:
new_attrs.append((name, value))
new_attrs = Attrs(new_attrs)
if directives:
directives.sort()
dirmap[(depth, tag)] = (directives, len(new_stream),
strip)
new_stream.append((kind, (tag, new_attrs), pos))
depth += 1
elif kind is END:
depth -= 1
new_stream.append((kind, data, pos))
# If there have have directive attributes with the
# corresponding start tag, move the events inbetween into
# a "subprogram"
if (depth, data) in dirmap:
directives, offset, strip = dirmap.pop((depth, data))
substream = new_stream[offset:]
if strip:
substream = substream[1:-1]
new_stream[offset:] = [
(SUB, (directives, substream), pos)
]
elif kind is SUB:
directives, substream = data
substream = self._extract_directives(substream, namespace,
factory)
if len(substream) == 1 and substream[0][0] is SUB:
added_directives, substream = substream[0][1]
directives += added_directives
new_stream.append((kind, (directives, substream), pos))
elif kind is START_NS:
# Strip out the namespace declaration for template
# directives
prefix, uri = data
ns_prefix[prefix] = uri
if uri != namespace:
new_stream.append((kind, data, pos))
elif kind is END_NS:
uri = ns_prefix.pop(data, None)
if uri and uri != namespace:
new_stream.append((kind, data, pos))
else:
new_stream.append((kind, data, pos))
return new_stream
def _extract_includes(self, stream):
streams = [[]] # stacked lists of events of the "compiled" template
prefixes = {}
fallbacks = []
includes = []
xinclude_ns = Namespace(self.XINCLUDE_NAMESPACE)
for kind, data, pos in stream:
stream = streams[-1]
if kind is START:
# Record any directive attributes in start tags
tag, attrs = data
if tag in xinclude_ns:
if tag.localname == 'include':
include_href = attrs.get('href')
if not include_href:
raise TemplateSyntaxError('Include misses required '
'attribute "href"',
self.filepath, *pos[1:])
includes.append((include_href, attrs.get('parse')))
streams.append([])
elif tag.localname == 'fallback':
streams.append([])
fallbacks.append(streams[-1])
else:
stream.append((kind, (tag, attrs), pos))
elif kind is END:
if fallbacks and data == xinclude_ns['fallback']:
assert streams.pop() is fallbacks[-1]
elif data == xinclude_ns['include']:
fallback = None
if len(fallbacks) == len(includes):
fallback = fallbacks.pop()
streams.pop() # discard anything between the include tags
# and the fallback element
stream = streams[-1]
href, parse = includes.pop()
try:
cls = {
'xml': MarkupTemplate,
'text': NewTextTemplate
}.get(parse) or self.__class__
except KeyError:
raise TemplateSyntaxError('Invalid value for "parse" '
'attribute of include',
self.filepath, *pos[1:])
stream.append((INCLUDE, (href, cls, fallback), pos))
else:
stream.append((kind, data, pos))
elif kind is START_NS and data[1] == xinclude_ns:
# Strip out the XInclude namespace
prefixes[data[0]] = data[1]
elif kind is END_NS and data in prefixes:
prefixes.pop(data)
else:
stream.append((kind, data, pos))
assert len(streams) == 1
return streams[0]
def _interpolate_attrs(self, stream):
for kind, data, pos in stream:
if kind is START:
# Record any directive attributes in start tags
tag, attrs = data
new_attrs = []
for name, value in attrs:
if value:
value = list(interpolate(value, self.filepath, pos[1],
pos[2], lookup=self.lookup))
if len(value) == 1 and value[0][0] is TEXT:
value = value[0][1]
new_attrs.append((name, value))
data = tag, Attrs(new_attrs)
yield kind, data, pos
def _prepare(self, stream):
return Template._prepare(self,
self._extract_includes(self._interpolate_attrs(stream))
)
def add_directives(self, namespace, factory):
"""Register a custom `DirectiveFactory` for a given namespace.
:param namespace: the namespace URI
:type namespace: `basestring`
:param factory: the directive factory to register
:type factory: `DirectiveFactory`
:since: version 0.6
"""
assert not self._prepared, 'Too late for adding directives, ' \
'template already prepared'
self._stream = self._extract_directives(self._stream, namespace,
factory)
def _match(self, stream, ctxt, start=0, end=None, **vars):
"""Internal stream filter that applies any defined match templates
to the stream.
"""
match_templates = ctxt._match_templates
tail = []
def _strip(stream, append=tail.append):
depth = 1
next = stream.next
while 1:
event = next()
if event[0] is START:
depth += 1
elif event[0] is END:
depth -= 1
if depth > 0:
yield event
else:
append(event)
break
for event in stream:
# We (currently) only care about start and end events for matching
# We might care about namespace events in the future, though
if not match_templates or (event[0] is not START and
event[0] is not END):
yield event
continue
for idx, (test, path, template, hints, namespaces, directives) \
in enumerate(match_templates):
if idx < start or end is not None and idx >= end:
continue
if test(event, namespaces, ctxt) is True:
if 'match_once' in hints:
del match_templates[idx]
idx -= 1
# Let the remaining match templates know about the event so
# they get a chance to update their internal state
for test in [mt[0] for mt in match_templates[idx + 1:]]:
test(event, namespaces, ctxt, updateonly=True)
# Consume and store all events until an end event
# corresponding to this start event is encountered
pre_end = idx + 1
if 'match_once' not in hints and 'not_recursive' in hints:
pre_end -= 1
inner = _strip(stream)
if pre_end > 0:
inner = self._match(inner, ctxt, start=start,
end=pre_end, **vars)
content = self._include(chain([event], inner, tail), ctxt)
if 'not_buffered' not in hints:
content = list(content)
content = Stream(content)
# Make the select() function available in the body of the
# match template
selected = [False]
def select(path):
selected[0] = True
return content.select(path, namespaces, ctxt)
vars = dict(select=select)
# Recursively process the output
template = _apply_directives(template, directives, ctxt,
vars)
for event in self._match(self._flatten(template, ctxt,
**vars),
ctxt, start=idx + 1, **vars):
yield event
# If the match template did not actually call select to
# consume the matched stream, the original events need to
# be consumed here or they'll get appended to the output
if not selected[0]:
for event in content:
pass
# Let the remaining match templates know about the last
# event in the matched content, so they can update their
# internal state accordingly
for test in [mt[0] for mt in match_templates[idx + 1:]]:
test(tail[0], namespaces, ctxt, updateonly=True)
break
else: # no matches
yield event