# -*- coding: utf-8 -*- # # Copyright (C) 2006-2009 Edgewall Software # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://genshi.edgewall.org/wiki/License. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at http://genshi.edgewall.org/log/. """Support for constructing markup streams from files, strings, or other sources. """ from itertools import chain import htmlentitydefs as entities import HTMLParser as html from StringIO import StringIO from xml.parsers import expat from genshi.core import Attrs, QName, Stream, stripentities from genshi.core import START, END, XML_DECL, DOCTYPE, TEXT, START_NS, \ END_NS, START_CDATA, END_CDATA, PI, COMMENT __all__ = ['ET', 'ParseError', 'XMLParser', 'XML', 'HTMLParser', 'HTML'] __docformat__ = 'restructuredtext en' def ET(element): """Convert a given ElementTree element to a markup stream. :param element: an ElementTree element :return: a markup stream """ tag_name = QName(element.tag.lstrip('{')) attrs = Attrs([(QName(attr.lstrip('{')), value) for attr, value in element.items()]) yield START, (tag_name, attrs), (None, -1, -1) if element.text: yield TEXT, element.text, (None, -1, -1) for child in element.getchildren(): for item in ET(child): yield item yield END, tag_name, (None, -1, -1) if element.tail: yield TEXT, element.tail, (None, -1, -1) class ParseError(Exception): """Exception raised when fatal syntax errors are found in the input being parsed. """ def __init__(self, message, filename=None, lineno=-1, offset=-1): """Exception initializer. :param message: the error message from the parser :param filename: the path to the file that was parsed :param lineno: the number of the line on which the error was encountered :param offset: the column number where the error was encountered """ self.msg = message if filename: message += ', in ' + filename Exception.__init__(self, message) self.filename = filename or '' self.lineno = lineno self.offset = offset class XMLParser(object): """Generator-based XML parser based on roughly equivalent code in Kid/ElementTree. The parsing is initiated by iterating over the parser object: >>> parser = XMLParser(StringIO('Foo')) >>> for kind, data, pos in parser: ... print('%s %s' % (kind, data)) START (QName('root'), Attrs([(QName('id'), u'2')])) START (QName('child'), Attrs()) TEXT Foo END child END root """ _entitydefs = ['' % (name, value) for name, value in entities.name2codepoint.items()] _external_dtd = '\n'.join(_entitydefs) def __init__(self, source, filename=None, encoding=None): """Initialize the parser for the given XML input. :param source: the XML text as a file-like object :param filename: the name of the file, if appropriate :param encoding: the encoding of the file; if not specified, the encoding is assumed to be ASCII, UTF-8, or UTF-16, or whatever the encoding specified in the XML declaration (if any) """ self.source = source self.filename = filename # Setup the Expat parser parser = expat.ParserCreate(encoding, '}') parser.buffer_text = True parser.returns_unicode = True parser.ordered_attributes = True parser.StartElementHandler = self._handle_start parser.EndElementHandler = self._handle_end parser.CharacterDataHandler = self._handle_data parser.StartDoctypeDeclHandler = self._handle_doctype parser.StartNamespaceDeclHandler = self._handle_start_ns parser.EndNamespaceDeclHandler = self._handle_end_ns parser.StartCdataSectionHandler = self._handle_start_cdata parser.EndCdataSectionHandler = self._handle_end_cdata parser.ProcessingInstructionHandler = self._handle_pi parser.XmlDeclHandler = self._handle_xml_decl parser.CommentHandler = self._handle_comment # Tell Expat that we'll handle non-XML entities ourselves # (in _handle_other) parser.DefaultHandler = self._handle_other parser.SetParamEntityParsing(expat.XML_PARAM_ENTITY_PARSING_ALWAYS) parser.UseForeignDTD() parser.ExternalEntityRefHandler = self._build_foreign self.expat = parser self._queue = [] def parse(self): """Generator that parses the XML source, yielding markup events. :return: a markup event stream :raises ParseError: if the XML text is not well formed """ def _generate(): try: bufsize = 4 * 1024 # 4K done = False while 1: while not done and len(self._queue) == 0: data = self.source.read(bufsize) if data == '': # end of data if hasattr(self, 'expat'): self.expat.Parse('', True) del self.expat # get rid of circular references done = True else: if isinstance(data, unicode): data = data.encode('utf-8') self.expat.Parse(data, False) for event in self._queue: yield event self._queue = [] if done: break except expat.ExpatError, e: msg = str(e) raise ParseError(msg, self.filename, e.lineno, e.offset) return Stream(_generate()).filter(_coalesce) def __iter__(self): return iter(self.parse()) def _build_foreign(self, context, base, sysid, pubid): parser = self.expat.ExternalEntityParserCreate(context) parser.ParseFile(StringIO(self._external_dtd)) return 1 def _enqueue(self, kind, data=None, pos=None): if pos is None: pos = self._getpos() if kind is TEXT: # Expat reports the *end* of the text event as current position. We # try to fix that up here as much as possible. Unfortunately, the # offset is only valid for single-line text. For multi-line text, # it is apparently not possible to determine at what offset it # started if '\n' in data: lines = data.splitlines() lineno = pos[1] - len(lines) + 1 offset = -1 else: lineno = pos[1] offset = pos[2] - len(data) pos = (pos[0], lineno, offset) self._queue.append((kind, data, pos)) def _getpos_unknown(self): return (self.filename, -1, -1) def _getpos(self): return (self.filename, self.expat.CurrentLineNumber, self.expat.CurrentColumnNumber) def _handle_start(self, tag, attrib): attrs = Attrs([(QName(name), value) for name, value in zip(*[iter(attrib)] * 2)]) self._enqueue(START, (QName(tag), attrs)) def _handle_end(self, tag): self._enqueue(END, QName(tag)) def _handle_data(self, text): self._enqueue(TEXT, text) def _handle_xml_decl(self, version, encoding, standalone): self._enqueue(XML_DECL, (version, encoding, standalone)) def _handle_doctype(self, name, sysid, pubid, has_internal_subset): self._enqueue(DOCTYPE, (name, pubid, sysid)) def _handle_start_ns(self, prefix, uri): self._enqueue(START_NS, (prefix or '', uri)) def _handle_end_ns(self, prefix): self._enqueue(END_NS, prefix or '') def _handle_start_cdata(self): self._enqueue(START_CDATA) def _handle_end_cdata(self): self._enqueue(END_CDATA) def _handle_pi(self, target, data): self._enqueue(PI, (target, data)) def _handle_comment(self, text): self._enqueue(COMMENT, text) def _handle_other(self, text): if text.startswith('&'): # deal with undefined entities try: text = unichr(entities.name2codepoint[text[1:-1]]) self._enqueue(TEXT, text) except KeyError: filename, lineno, offset = self._getpos() error = expat.error('undefined entity "%s": line %d, column %d' % (text, lineno, offset)) error.code = expat.errors.XML_ERROR_UNDEFINED_ENTITY error.lineno = lineno error.offset = offset raise error def XML(text): """Parse the given XML source and return a markup stream. Unlike with `XMLParser`, the returned stream is reusable, meaning it can be iterated over multiple times: >>> xml = XML('FooBar') >>> print(xml) FooBar >>> print(xml.select('elem')) FooBar >>> print(xml.select('elem/text()')) FooBar :param text: the XML source :return: the parsed XML event stream :raises ParseError: if the XML text is not well-formed """ return Stream(list(XMLParser(StringIO(text)))) class HTMLParser(html.HTMLParser, object): """Parser for HTML input based on the Python `HTMLParser` module. This class provides the same interface for generating stream events as `XMLParser`, and attempts to automatically balance tags. The parsing is initiated by iterating over the parser object: >>> parser = HTMLParser(StringIO('')) >>> for kind, data, pos in parser: ... print('%s %s' % (kind, data)) START (QName('ul'), Attrs([(QName('compact'), u'compact')])) START (QName('li'), Attrs()) TEXT Foo END li END ul """ _EMPTY_ELEMS = frozenset(['area', 'base', 'basefont', 'br', 'col', 'frame', 'hr', 'img', 'input', 'isindex', 'link', 'meta', 'param']) def __init__(self, source, filename=None, encoding='utf-8'): """Initialize the parser for the given HTML input. :param source: the HTML text as a file-like object :param filename: the name of the file, if known :param filename: encoding of the file; ignored if the input is unicode """ html.HTMLParser.__init__(self) self.source = source self.filename = filename self.encoding = encoding self._queue = [] self._open_tags = [] def parse(self): """Generator that parses the HTML source, yielding markup events. :return: a markup event stream :raises ParseError: if the HTML text is not well formed """ def _generate(): try: bufsize = 4 * 1024 # 4K done = False while 1: while not done and len(self._queue) == 0: data = self.source.read(bufsize) if data == '': # end of data self.close() done = True else: self.feed(data) for kind, data, pos in self._queue: yield kind, data, pos self._queue = [] if done: open_tags = self._open_tags open_tags.reverse() for tag in open_tags: yield END, QName(tag), pos break except html.HTMLParseError, e: msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset) raise ParseError(msg, self.filename, e.lineno, e.offset) return Stream(_generate()).filter(_coalesce) def __iter__(self): return iter(self.parse()) def _enqueue(self, kind, data, pos=None): if pos is None: pos = self._getpos() self._queue.append((kind, data, pos)) def _getpos(self): lineno, column = self.getpos() return (self.filename, lineno, column) def handle_starttag(self, tag, attrib): fixed_attrib = [] for name, value in attrib: # Fixup minimized attributes if value is None: value = unicode(name) elif not isinstance(value, unicode): value = value.decode(self.encoding, 'replace') fixed_attrib.append((QName(name), stripentities(value))) self._enqueue(START, (QName(tag), Attrs(fixed_attrib))) if tag in self._EMPTY_ELEMS: self._enqueue(END, QName(tag)) else: self._open_tags.append(tag) def handle_endtag(self, tag): if tag not in self._EMPTY_ELEMS: while self._open_tags: open_tag = self._open_tags.pop() self._enqueue(END, QName(open_tag)) if open_tag.lower() == tag.lower(): break def handle_data(self, text): if not isinstance(text, unicode): text = text.decode(self.encoding, 'replace') self._enqueue(TEXT, text) def handle_charref(self, name): if name.lower().startswith('x'): text = unichr(int(name[1:], 16)) else: text = unichr(int(name)) self._enqueue(TEXT, text) def handle_entityref(self, name): try: text = unichr(entities.name2codepoint[name]) except KeyError: text = '&%s;' % name self._enqueue(TEXT, text) def handle_pi(self, data): target, data = data.split(None, 1) if data.endswith('?'): data = data[:-1] self._enqueue(PI, (target.strip(), data.strip())) def handle_comment(self, text): self._enqueue(COMMENT, text) def HTML(text, encoding='utf-8'): """Parse the given HTML source and return a markup stream. Unlike with `HTMLParser`, the returned stream is reusable, meaning it can be iterated over multiple times: >>> html = HTML('

Foo

') >>> print(html)

Foo

>>> print(html.select('h1'))

Foo

>>> print(html.select('h1/text()')) Foo :param text: the HTML source :return: the parsed XML event stream :raises ParseError: if the HTML text is not well-formed, and error recovery fails """ return Stream(list(HTMLParser(StringIO(text), encoding=encoding))) def _coalesce(stream): """Coalesces adjacent TEXT events into a single event.""" textbuf = [] textpos = None for kind, data, pos in chain(stream, [(None, None, None)]): if kind is TEXT: textbuf.append(data) if textpos is None: textpos = pos else: if textbuf: yield TEXT, ''.join(textbuf), textpos del textbuf[:] textpos = None if kind: yield kind, data, pos