diff options
Diffstat (limited to 'mwlib/mwapidb.py')
-rw-r--r-- | mwlib/mwapidb.py | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/mwlib/mwapidb.py b/mwlib/mwapidb.py new file mode 100644 index 0000000..4826ef4 --- /dev/null +++ b/mwlib/mwapidb.py @@ -0,0 +1,376 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2008, PediaPress GmbH +# See README.txt for additional licensing information. + +import os +import re +import shutil +import tempfile +import time +import urllib +import urllib2 +import urlparse + +import simplejson + +from mwlib import uparser, utils +from mwlib.log import Log + +log = Log("mwapidb") + +try: + from mwlib.licenses import lower2normal +except ImportError: + log.warn('no licenses found') + lower2normal = {} + +# ============================================================================== + + +def fetch_url(url, ignore_errors=False): + log.info("fetching %r" % (url,)) + opener = urllib2.build_opener() + opener.addheaders = [('User-agent', 'mwlib')] + try: + data = opener.open(url).read() + except urllib2.URLError, err: + if ignore_errors: + log.error("%s - while fetching %r" % (err, url)) + return None + raise RuntimeError('Could not fetch %r: %s' % (url, err)) + log.info("got %r (%d Bytes)" % (url, len(data))) + return data + + +# ============================================================================== + + +class APIHelper(object): + def __init__(self, base_url): + """ + @param base_url: base URL (or list of URLs) of a MediaWiki, + i.e. URL path to php scripts, + e.g. 'http://en.wikipedia.org/w/' for English Wikipedia. + @type base_url: basestring or [basestring] + """ + + if isinstance(base_url, unicode): + self.base_url = base_url.encode('utf-8') + else: + self.base_url = base_url + if self.base_url[-1] != '/': + self.base_url += '/' + + def query(self, **kwargs): + args = { + 'action': 'query', + 'format': 'json', + } + args.update(**kwargs) + for k, v in args.items(): + if isinstance(v, unicode): + args[k] = v.encode('utf-8') + data = fetch_url('%sapi.php?%s' % (self.base_url, urllib.urlencode(args))) + if data is None: + return None + try: + return simplejson.loads(unicode(data, 'utf-8'))['query'] + except KeyError: + return None + except: + raise RuntimeError('api.php query failed. Are you sure you specified the correct baseurl?') + + def page_query(self, **kwargs): + q = self.query(**kwargs) + if q is None: + return None + try: + page = q['pages'].values()[0] + except (KeyError, IndexError): + return None + if 'missing' in page: + return None + return page + + +# ============================================================================== + + +class ImageDB(object): + def __init__(self, base_url, shared_base_url=None): + self.api_helpers = [APIHelper(base_url)] + if shared_base_url is not None: + self.api_helpers.append(APIHelper(shared_base_url)) + self.tmpdir = tempfile.mkdtemp() + + def clear(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def getURL(self, name, size=None): + """Return image URL for image with given name + + @param name: image name (without namespace, i.e. without 'Image:') + @type name: unicode + + @returns: URL to original image + @rtype: str + """ + + assert isinstance(name, unicode), 'name must be of type unicode' + + for api_helper in self.api_helpers: + if size is None: + result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url') + else: + result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url', iiurlwidth=str(size)) + if result is not None: + break + else: + return None + + try: + imageinfo = result['imageinfo'][0] + if size is not None and 'thumburl' in imageinfo: + url = imageinfo['thumburl'] + else: + url = imageinfo['url'] + if url: # url can be False + if url.startswith('/'): + url = urlparse.urljoin(self.api_helpers[0].base_url, url) + return url + return None + except (KeyError, IndexError): + return None + + def getDiskPath(self, name, size=None): + """Return filename for image with given name and size + + @param name: image name (without namespace, i.e. without 'Image:') + @type name: unicode + + @param size: if given, the image is converted to the given maximum width + @type size: int or NoneType + + @returns: filename of image or None if image could not be found + @rtype: basestring + """ + + assert isinstance(name, unicode), 'name must be of type unicode' + + url = self.getURL(name, size=size) + if url is None: + return None + + data = fetch_url(url, ignore_errors=True) + if not data: + return None + + ext = url.rsplit('.')[-1] + if size is not None: + ext = '%dpx.%s' % (size, ext) + else: + ext = '.%s' % ext + filename = os.path.join(self.tmpdir, utils.fsescape(name + ext)) + f = open(filename, 'wb') + f.write(data) + f.close() + return filename + + def getLicense(self, name): + """Return license of image as stated on image description page + + @param name: image name without namespace (e.g. without "Image:") + @type name: unicode + + @returns: license of image of None, if no valid license could be found + @rtype: unicode + """ + + assert isinstance(name, unicode), 'name must be of type unicode' + + for api_helper in self.api_helpers: + result = api_helper.page_query(titles='Image:%s' % name, prop='templates') + if result is not None: + break + else: + return None + + try: + templates = [t['title'] for t in result['templates']] + except KeyError: + return None + + for t in templates: + try: + return lower2normal[t.split(':', 1)[-1].lower()] + except KeyError: + pass + + return None + + +# ============================================================================== + + +class WikiDB(object): + print_template = u'Template:Print%s' + + ip_rex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') + bot_rex = re.compile(r'\bbot\b', re.IGNORECASE) + + def __init__(self, base_url, license, template_blacklist=None): + """ + @param base_url: base URL of a MediaWiki, + e.g. 'http://en.wikipedia.org/w/' + @type base_url: basestring + + @param license: title of an article containing full license text + @type license: unicode + + @param template_blacklist: title of an article containing blacklisted + templates (optional) + @type template_blacklist: unicode + """ + + self.base_url = base_url + self.license = license + self.api_helper = APIHelper(self.base_url) + self.template_cache = {} + self.template_blacklist = [] + if template_blacklist is not None: + raw = self.getRawArticle(template_blacklist) + if raw is None: + log.error('Could not get template blacklist article %r' % template_blacklist) + else: + self.template_blacklist = [template.lower().strip() + for template in re.findall('\* *\[\[.*?:(.*?)\]\]', raw)] + + def getURL(self, title, revision=None): + name = urllib.quote(title.replace(" ", "_").encode('utf-8')) + if revision is None: + return '%sindex.php?title=%s' % (self.base_url, name) + else: + return '%sindex.php?title=%s&oldid=%s' % (self.base_url, name, revision) + + def getAuthors(self, title, revision=None, max_num_authors=10): + """Return at most max_num_authors names of non-bot, non-anon users for + non-minor changes of given article (before given revsion). + + @returns: list of principal authors + @rtype: [unicode] + """ + + result = self.api_helper.page_query( + titles=title, + redirects=1, + prop='revisions', + rvprop='user|ids|flags|comment', + rvlimit=500, + ) + if result is None: + return None + + try: + revs = result['revisions'] + except KeyError: + return None + + if revision is not None: + revision = int(revision) + revs = [r for r in revs if r['revid'] < revision] + + authors = [r['user'] for r in revs + if not r.get('anon') + and not self.ip_rex.match(r['user']) + and not r.get('minor') + and not self.bot_rex.search(r.get('comment', '')) + and not self.bot_rex.search(r['user']) + ] + author2count = {} + for a in authors: + try: + author2count[a] += 1 + except KeyError: + author2count[a] = 1 + author2count = author2count.items() + author2count.sort(key=lambda a: -a[1]) + return [a[0] for a in author2count[:max_num_authors]] + + def getTemplate(self, name, followRedirects=True): + """ + Note: *Not* following redirects is unsupported! + """ + + try: + return self.template_cache[name] + except KeyError: + pass + + if ":" in name: + name = name.split(':', 1)[1] + + if name.lower() in self.template_blacklist: + log.info("ignoring blacklisted template:" , repr(name)) + return None + + for title in (self.print_template % name, 'Template:%s' % name): + log.info("Trying template %r" % (title,)) + c = self.getRawArticle(title) + if c is not None: + self.template_cache[name] = c + return c + + return None + + def getRawArticle(self, title, revision=None): + if revision is None: + page = self.api_helper.page_query(titles=title, redirects=1, prop='revisions', rvprop='content') + else: + page = self.api_helper.page_query(revids=revision, prop='revisions', rvprop='content') + if page['title'] != title: # given revision could point to another article! + return None + if page is None: + return None + try: + return page['revisions'][0].values()[0] + except KeyError: + return None + + def getMetaData(self): + result = self.api_helper.query(meta='siteinfo') + try: + g = result['general'] + return { + 'license': { + 'name': g['rights'], + 'wikitext': self.getRawArticle(self.license), + }, + 'url': g['base'], + 'name': '%s (%s)' % (g['sitename'], g['lang']), + } + except KeyError: + return None + + def getParsedArticle(self, title, revision=None): + raw = self.getRawArticle(title, revision=revision) + if raw is None: + return None + a = uparser.parseString(title=title, raw=raw, wikidb=self) + return a + + +class Overlay(WikiDB): + def __init__(self, wikidb, templates): + self.__dict__.update(wikidb.__dict__) + self.overlay_templates = templates + + def getTemplate(self, name, followRedirects=False): + try: + return self.overlay_templates[name] + except KeyError: + pass + + return super(Overlay, self).getTemplate(name, followRedirects=followRedirects) + |