diff options
Diffstat (limited to 'mwlib/mwapidb.py')
-rw-r--r-- | mwlib/mwapidb.py | 376 |
1 files changed, 0 insertions, 376 deletions
diff --git a/mwlib/mwapidb.py b/mwlib/mwapidb.py deleted file mode 100644 index 4826ef4..0000000 --- a/mwlib/mwapidb.py +++ /dev/null @@ -1,376 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- - -# Copyright (c) 2008, PediaPress GmbH -# See README.txt for additional licensing information. - -import os -import re -import shutil -import tempfile -import time -import urllib -import urllib2 -import urlparse - -import simplejson - -from mwlib import uparser, utils -from mwlib.log import Log - -log = Log("mwapidb") - -try: - from mwlib.licenses import lower2normal -except ImportError: - log.warn('no licenses found') - lower2normal = {} - -# ============================================================================== - - -def fetch_url(url, ignore_errors=False): - log.info("fetching %r" % (url,)) - opener = urllib2.build_opener() - opener.addheaders = [('User-agent', 'mwlib')] - try: - data = opener.open(url).read() - except urllib2.URLError, err: - if ignore_errors: - log.error("%s - while fetching %r" % (err, url)) - return None - raise RuntimeError('Could not fetch %r: %s' % (url, err)) - log.info("got %r (%d Bytes)" % (url, len(data))) - return data - - -# ============================================================================== - - -class APIHelper(object): - def __init__(self, base_url): - """ - @param base_url: base URL (or list of URLs) of a MediaWiki, - i.e. URL path to php scripts, - e.g. 'http://en.wikipedia.org/w/' for English Wikipedia. - @type base_url: basestring or [basestring] - """ - - if isinstance(base_url, unicode): - self.base_url = base_url.encode('utf-8') - else: - self.base_url = base_url - if self.base_url[-1] != '/': - self.base_url += '/' - - def query(self, **kwargs): - args = { - 'action': 'query', - 'format': 'json', - } - args.update(**kwargs) - for k, v in args.items(): - if isinstance(v, unicode): - args[k] = v.encode('utf-8') - data = fetch_url('%sapi.php?%s' % (self.base_url, urllib.urlencode(args))) - if data is None: - return None - try: - return simplejson.loads(unicode(data, 'utf-8'))['query'] - except KeyError: - return None - except: - raise RuntimeError('api.php query failed. Are you sure you specified the correct baseurl?') - - def page_query(self, **kwargs): - q = self.query(**kwargs) - if q is None: - return None - try: - page = q['pages'].values()[0] - except (KeyError, IndexError): - return None - if 'missing' in page: - return None - return page - - -# ============================================================================== - - -class ImageDB(object): - def __init__(self, base_url, shared_base_url=None): - self.api_helpers = [APIHelper(base_url)] - if shared_base_url is not None: - self.api_helpers.append(APIHelper(shared_base_url)) - self.tmpdir = tempfile.mkdtemp() - - def clear(self): - shutil.rmtree(self.tmpdir, ignore_errors=True) - - def getURL(self, name, size=None): - """Return image URL for image with given name - - @param name: image name (without namespace, i.e. without 'Image:') - @type name: unicode - - @returns: URL to original image - @rtype: str - """ - - assert isinstance(name, unicode), 'name must be of type unicode' - - for api_helper in self.api_helpers: - if size is None: - result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url') - else: - result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url', iiurlwidth=str(size)) - if result is not None: - break - else: - return None - - try: - imageinfo = result['imageinfo'][0] - if size is not None and 'thumburl' in imageinfo: - url = imageinfo['thumburl'] - else: - url = imageinfo['url'] - if url: # url can be False - if url.startswith('/'): - url = urlparse.urljoin(self.api_helpers[0].base_url, url) - return url - return None - except (KeyError, IndexError): - return None - - def getDiskPath(self, name, size=None): - """Return filename for image with given name and size - - @param name: image name (without namespace, i.e. without 'Image:') - @type name: unicode - - @param size: if given, the image is converted to the given maximum width - @type size: int or NoneType - - @returns: filename of image or None if image could not be found - @rtype: basestring - """ - - assert isinstance(name, unicode), 'name must be of type unicode' - - url = self.getURL(name, size=size) - if url is None: - return None - - data = fetch_url(url, ignore_errors=True) - if not data: - return None - - ext = url.rsplit('.')[-1] - if size is not None: - ext = '%dpx.%s' % (size, ext) - else: - ext = '.%s' % ext - filename = os.path.join(self.tmpdir, utils.fsescape(name + ext)) - f = open(filename, 'wb') - f.write(data) - f.close() - return filename - - def getLicense(self, name): - """Return license of image as stated on image description page - - @param name: image name without namespace (e.g. without "Image:") - @type name: unicode - - @returns: license of image of None, if no valid license could be found - @rtype: unicode - """ - - assert isinstance(name, unicode), 'name must be of type unicode' - - for api_helper in self.api_helpers: - result = api_helper.page_query(titles='Image:%s' % name, prop='templates') - if result is not None: - break - else: - return None - - try: - templates = [t['title'] for t in result['templates']] - except KeyError: - return None - - for t in templates: - try: - return lower2normal[t.split(':', 1)[-1].lower()] - except KeyError: - pass - - return None - - -# ============================================================================== - - -class WikiDB(object): - print_template = u'Template:Print%s' - - ip_rex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') - bot_rex = re.compile(r'\bbot\b', re.IGNORECASE) - - def __init__(self, base_url, license, template_blacklist=None): - """ - @param base_url: base URL of a MediaWiki, - e.g. 'http://en.wikipedia.org/w/' - @type base_url: basestring - - @param license: title of an article containing full license text - @type license: unicode - - @param template_blacklist: title of an article containing blacklisted - templates (optional) - @type template_blacklist: unicode - """ - - self.base_url = base_url - self.license = license - self.api_helper = APIHelper(self.base_url) - self.template_cache = {} - self.template_blacklist = [] - if template_blacklist is not None: - raw = self.getRawArticle(template_blacklist) - if raw is None: - log.error('Could not get template blacklist article %r' % template_blacklist) - else: - self.template_blacklist = [template.lower().strip() - for template in re.findall('\* *\[\[.*?:(.*?)\]\]', raw)] - - def getURL(self, title, revision=None): - name = urllib.quote(title.replace(" ", "_").encode('utf-8')) - if revision is None: - return '%sindex.php?title=%s' % (self.base_url, name) - else: - return '%sindex.php?title=%s&oldid=%s' % (self.base_url, name, revision) - - def getAuthors(self, title, revision=None, max_num_authors=10): - """Return at most max_num_authors names of non-bot, non-anon users for - non-minor changes of given article (before given revsion). - - @returns: list of principal authors - @rtype: [unicode] - """ - - result = self.api_helper.page_query( - titles=title, - redirects=1, - prop='revisions', - rvprop='user|ids|flags|comment', - rvlimit=500, - ) - if result is None: - return None - - try: - revs = result['revisions'] - except KeyError: - return None - - if revision is not None: - revision = int(revision) - revs = [r for r in revs if r['revid'] < revision] - - authors = [r['user'] for r in revs - if not r.get('anon') - and not self.ip_rex.match(r['user']) - and not r.get('minor') - and not self.bot_rex.search(r.get('comment', '')) - and not self.bot_rex.search(r['user']) - ] - author2count = {} - for a in authors: - try: - author2count[a] += 1 - except KeyError: - author2count[a] = 1 - author2count = author2count.items() - author2count.sort(key=lambda a: -a[1]) - return [a[0] for a in author2count[:max_num_authors]] - - def getTemplate(self, name, followRedirects=True): - """ - Note: *Not* following redirects is unsupported! - """ - - try: - return self.template_cache[name] - except KeyError: - pass - - if ":" in name: - name = name.split(':', 1)[1] - - if name.lower() in self.template_blacklist: - log.info("ignoring blacklisted template:" , repr(name)) - return None - - for title in (self.print_template % name, 'Template:%s' % name): - log.info("Trying template %r" % (title,)) - c = self.getRawArticle(title) - if c is not None: - self.template_cache[name] = c - return c - - return None - - def getRawArticle(self, title, revision=None): - if revision is None: - page = self.api_helper.page_query(titles=title, redirects=1, prop='revisions', rvprop='content') - else: - page = self.api_helper.page_query(revids=revision, prop='revisions', rvprop='content') - if page['title'] != title: # given revision could point to another article! - return None - if page is None: - return None - try: - return page['revisions'][0].values()[0] - except KeyError: - return None - - def getMetaData(self): - result = self.api_helper.query(meta='siteinfo') - try: - g = result['general'] - return { - 'license': { - 'name': g['rights'], - 'wikitext': self.getRawArticle(self.license), - }, - 'url': g['base'], - 'name': '%s (%s)' % (g['sitename'], g['lang']), - } - except KeyError: - return None - - def getParsedArticle(self, title, revision=None): - raw = self.getRawArticle(title, revision=revision) - if raw is None: - return None - a = uparser.parseString(title=title, raw=raw, wikidb=self) - return a - - -class Overlay(WikiDB): - def __init__(self, wikidb, templates): - self.__dict__.update(wikidb.__dict__) - self.overlay_templates = templates - - def getTemplate(self, name, followRedirects=False): - try: - return self.overlay_templates[name] - except KeyError: - pass - - return super(Overlay, self).getTemplate(name, followRedirects=followRedirects) - |