Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/mwlib/mwapidb.py
diff options
context:
space:
mode:
Diffstat (limited to 'mwlib/mwapidb.py')
-rw-r--r--mwlib/mwapidb.py376
1 files changed, 376 insertions, 0 deletions
diff --git a/mwlib/mwapidb.py b/mwlib/mwapidb.py
new file mode 100644
index 0000000..4826ef4
--- /dev/null
+++ b/mwlib/mwapidb.py
@@ -0,0 +1,376 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2008, PediaPress GmbH
+# See README.txt for additional licensing information.
+
+import os
+import re
+import shutil
+import tempfile
+import time
+import urllib
+import urllib2
+import urlparse
+
+import simplejson
+
+from mwlib import uparser, utils
+from mwlib.log import Log
+
+log = Log("mwapidb")
+
+try:
+ from mwlib.licenses import lower2normal
+except ImportError:
+ log.warn('no licenses found')
+ lower2normal = {}
+
+# ==============================================================================
+
+
+def fetch_url(url, ignore_errors=False):
+ log.info("fetching %r" % (url,))
+ opener = urllib2.build_opener()
+ opener.addheaders = [('User-agent', 'mwlib')]
+ try:
+ data = opener.open(url).read()
+ except urllib2.URLError, err:
+ if ignore_errors:
+ log.error("%s - while fetching %r" % (err, url))
+ return None
+ raise RuntimeError('Could not fetch %r: %s' % (url, err))
+ log.info("got %r (%d Bytes)" % (url, len(data)))
+ return data
+
+
+# ==============================================================================
+
+
+class APIHelper(object):
+ def __init__(self, base_url):
+ """
+ @param base_url: base URL (or list of URLs) of a MediaWiki,
+ i.e. URL path to php scripts,
+ e.g. 'http://en.wikipedia.org/w/' for English Wikipedia.
+ @type base_url: basestring or [basestring]
+ """
+
+ if isinstance(base_url, unicode):
+ self.base_url = base_url.encode('utf-8')
+ else:
+ self.base_url = base_url
+ if self.base_url[-1] != '/':
+ self.base_url += '/'
+
+ def query(self, **kwargs):
+ args = {
+ 'action': 'query',
+ 'format': 'json',
+ }
+ args.update(**kwargs)
+ for k, v in args.items():
+ if isinstance(v, unicode):
+ args[k] = v.encode('utf-8')
+ data = fetch_url('%sapi.php?%s' % (self.base_url, urllib.urlencode(args)))
+ if data is None:
+ return None
+ try:
+ return simplejson.loads(unicode(data, 'utf-8'))['query']
+ except KeyError:
+ return None
+ except:
+ raise RuntimeError('api.php query failed. Are you sure you specified the correct baseurl?')
+
+ def page_query(self, **kwargs):
+ q = self.query(**kwargs)
+ if q is None:
+ return None
+ try:
+ page = q['pages'].values()[0]
+ except (KeyError, IndexError):
+ return None
+ if 'missing' in page:
+ return None
+ return page
+
+
+# ==============================================================================
+
+
+class ImageDB(object):
+ def __init__(self, base_url, shared_base_url=None):
+ self.api_helpers = [APIHelper(base_url)]
+ if shared_base_url is not None:
+ self.api_helpers.append(APIHelper(shared_base_url))
+ self.tmpdir = tempfile.mkdtemp()
+
+ def clear(self):
+ shutil.rmtree(self.tmpdir, ignore_errors=True)
+
+ def getURL(self, name, size=None):
+ """Return image URL for image with given name
+
+ @param name: image name (without namespace, i.e. without 'Image:')
+ @type name: unicode
+
+ @returns: URL to original image
+ @rtype: str
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+
+ for api_helper in self.api_helpers:
+ if size is None:
+ result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url')
+ else:
+ result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url', iiurlwidth=str(size))
+ if result is not None:
+ break
+ else:
+ return None
+
+ try:
+ imageinfo = result['imageinfo'][0]
+ if size is not None and 'thumburl' in imageinfo:
+ url = imageinfo['thumburl']
+ else:
+ url = imageinfo['url']
+ if url: # url can be False
+ if url.startswith('/'):
+ url = urlparse.urljoin(self.api_helpers[0].base_url, url)
+ return url
+ return None
+ except (KeyError, IndexError):
+ return None
+
+ def getDiskPath(self, name, size=None):
+ """Return filename for image with given name and size
+
+ @param name: image name (without namespace, i.e. without 'Image:')
+ @type name: unicode
+
+ @param size: if given, the image is converted to the given maximum width
+ @type size: int or NoneType
+
+ @returns: filename of image or None if image could not be found
+ @rtype: basestring
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+
+ url = self.getURL(name, size=size)
+ if url is None:
+ return None
+
+ data = fetch_url(url, ignore_errors=True)
+ if not data:
+ return None
+
+ ext = url.rsplit('.')[-1]
+ if size is not None:
+ ext = '%dpx.%s' % (size, ext)
+ else:
+ ext = '.%s' % ext
+ filename = os.path.join(self.tmpdir, utils.fsescape(name + ext))
+ f = open(filename, 'wb')
+ f.write(data)
+ f.close()
+ return filename
+
+ def getLicense(self, name):
+ """Return license of image as stated on image description page
+
+ @param name: image name without namespace (e.g. without "Image:")
+ @type name: unicode
+
+ @returns: license of image of None, if no valid license could be found
+ @rtype: unicode
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+
+ for api_helper in self.api_helpers:
+ result = api_helper.page_query(titles='Image:%s' % name, prop='templates')
+ if result is not None:
+ break
+ else:
+ return None
+
+ try:
+ templates = [t['title'] for t in result['templates']]
+ except KeyError:
+ return None
+
+ for t in templates:
+ try:
+ return lower2normal[t.split(':', 1)[-1].lower()]
+ except KeyError:
+ pass
+
+ return None
+
+
+# ==============================================================================
+
+
+class WikiDB(object):
+ print_template = u'Template:Print%s'
+
+ ip_rex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
+ bot_rex = re.compile(r'\bbot\b', re.IGNORECASE)
+
+ def __init__(self, base_url, license, template_blacklist=None):
+ """
+ @param base_url: base URL of a MediaWiki,
+ e.g. 'http://en.wikipedia.org/w/'
+ @type base_url: basestring
+
+ @param license: title of an article containing full license text
+ @type license: unicode
+
+ @param template_blacklist: title of an article containing blacklisted
+ templates (optional)
+ @type template_blacklist: unicode
+ """
+
+ self.base_url = base_url
+ self.license = license
+ self.api_helper = APIHelper(self.base_url)
+ self.template_cache = {}
+ self.template_blacklist = []
+ if template_blacklist is not None:
+ raw = self.getRawArticle(template_blacklist)
+ if raw is None:
+ log.error('Could not get template blacklist article %r' % template_blacklist)
+ else:
+ self.template_blacklist = [template.lower().strip()
+ for template in re.findall('\* *\[\[.*?:(.*?)\]\]', raw)]
+
+ def getURL(self, title, revision=None):
+ name = urllib.quote(title.replace(" ", "_").encode('utf-8'))
+ if revision is None:
+ return '%sindex.php?title=%s' % (self.base_url, name)
+ else:
+ return '%sindex.php?title=%s&oldid=%s' % (self.base_url, name, revision)
+
+ def getAuthors(self, title, revision=None, max_num_authors=10):
+ """Return at most max_num_authors names of non-bot, non-anon users for
+ non-minor changes of given article (before given revsion).
+
+ @returns: list of principal authors
+ @rtype: [unicode]
+ """
+
+ result = self.api_helper.page_query(
+ titles=title,
+ redirects=1,
+ prop='revisions',
+ rvprop='user|ids|flags|comment',
+ rvlimit=500,
+ )
+ if result is None:
+ return None
+
+ try:
+ revs = result['revisions']
+ except KeyError:
+ return None
+
+ if revision is not None:
+ revision = int(revision)
+ revs = [r for r in revs if r['revid'] < revision]
+
+ authors = [r['user'] for r in revs
+ if not r.get('anon')
+ and not self.ip_rex.match(r['user'])
+ and not r.get('minor')
+ and not self.bot_rex.search(r.get('comment', ''))
+ and not self.bot_rex.search(r['user'])
+ ]
+ author2count = {}
+ for a in authors:
+ try:
+ author2count[a] += 1
+ except KeyError:
+ author2count[a] = 1
+ author2count = author2count.items()
+ author2count.sort(key=lambda a: -a[1])
+ return [a[0] for a in author2count[:max_num_authors]]
+
+ def getTemplate(self, name, followRedirects=True):
+ """
+ Note: *Not* following redirects is unsupported!
+ """
+
+ try:
+ return self.template_cache[name]
+ except KeyError:
+ pass
+
+ if ":" in name:
+ name = name.split(':', 1)[1]
+
+ if name.lower() in self.template_blacklist:
+ log.info("ignoring blacklisted template:" , repr(name))
+ return None
+
+ for title in (self.print_template % name, 'Template:%s' % name):
+ log.info("Trying template %r" % (title,))
+ c = self.getRawArticle(title)
+ if c is not None:
+ self.template_cache[name] = c
+ return c
+
+ return None
+
+ def getRawArticle(self, title, revision=None):
+ if revision is None:
+ page = self.api_helper.page_query(titles=title, redirects=1, prop='revisions', rvprop='content')
+ else:
+ page = self.api_helper.page_query(revids=revision, prop='revisions', rvprop='content')
+ if page['title'] != title: # given revision could point to another article!
+ return None
+ if page is None:
+ return None
+ try:
+ return page['revisions'][0].values()[0]
+ except KeyError:
+ return None
+
+ def getMetaData(self):
+ result = self.api_helper.query(meta='siteinfo')
+ try:
+ g = result['general']
+ return {
+ 'license': {
+ 'name': g['rights'],
+ 'wikitext': self.getRawArticle(self.license),
+ },
+ 'url': g['base'],
+ 'name': '%s (%s)' % (g['sitename'], g['lang']),
+ }
+ except KeyError:
+ return None
+
+ def getParsedArticle(self, title, revision=None):
+ raw = self.getRawArticle(title, revision=revision)
+ if raw is None:
+ return None
+ a = uparser.parseString(title=title, raw=raw, wikidb=self)
+ return a
+
+
+class Overlay(WikiDB):
+ def __init__(self, wikidb, templates):
+ self.__dict__.update(wikidb.__dict__)
+ self.overlay_templates = templates
+
+ def getTemplate(self, name, followRedirects=False):
+ try:
+ return self.overlay_templates[name]
+ except KeyError:
+ pass
+
+ return super(Overlay, self).getTemplate(name, followRedirects=followRedirects)
+