Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/mwlib/mwapidb.py
diff options
context:
space:
mode:
Diffstat (limited to 'mwlib/mwapidb.py')
-rw-r--r--mwlib/mwapidb.py376
1 files changed, 0 insertions, 376 deletions
diff --git a/mwlib/mwapidb.py b/mwlib/mwapidb.py
deleted file mode 100644
index 4826ef4..0000000
--- a/mwlib/mwapidb.py
+++ /dev/null
@@ -1,376 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# Copyright (c) 2008, PediaPress GmbH
-# See README.txt for additional licensing information.
-
-import os
-import re
-import shutil
-import tempfile
-import time
-import urllib
-import urllib2
-import urlparse
-
-import simplejson
-
-from mwlib import uparser, utils
-from mwlib.log import Log
-
-log = Log("mwapidb")
-
-try:
- from mwlib.licenses import lower2normal
-except ImportError:
- log.warn('no licenses found')
- lower2normal = {}
-
-# ==============================================================================
-
-
-def fetch_url(url, ignore_errors=False):
- log.info("fetching %r" % (url,))
- opener = urllib2.build_opener()
- opener.addheaders = [('User-agent', 'mwlib')]
- try:
- data = opener.open(url).read()
- except urllib2.URLError, err:
- if ignore_errors:
- log.error("%s - while fetching %r" % (err, url))
- return None
- raise RuntimeError('Could not fetch %r: %s' % (url, err))
- log.info("got %r (%d Bytes)" % (url, len(data)))
- return data
-
-
-# ==============================================================================
-
-
-class APIHelper(object):
- def __init__(self, base_url):
- """
- @param base_url: base URL (or list of URLs) of a MediaWiki,
- i.e. URL path to php scripts,
- e.g. 'http://en.wikipedia.org/w/' for English Wikipedia.
- @type base_url: basestring or [basestring]
- """
-
- if isinstance(base_url, unicode):
- self.base_url = base_url.encode('utf-8')
- else:
- self.base_url = base_url
- if self.base_url[-1] != '/':
- self.base_url += '/'
-
- def query(self, **kwargs):
- args = {
- 'action': 'query',
- 'format': 'json',
- }
- args.update(**kwargs)
- for k, v in args.items():
- if isinstance(v, unicode):
- args[k] = v.encode('utf-8')
- data = fetch_url('%sapi.php?%s' % (self.base_url, urllib.urlencode(args)))
- if data is None:
- return None
- try:
- return simplejson.loads(unicode(data, 'utf-8'))['query']
- except KeyError:
- return None
- except:
- raise RuntimeError('api.php query failed. Are you sure you specified the correct baseurl?')
-
- def page_query(self, **kwargs):
- q = self.query(**kwargs)
- if q is None:
- return None
- try:
- page = q['pages'].values()[0]
- except (KeyError, IndexError):
- return None
- if 'missing' in page:
- return None
- return page
-
-
-# ==============================================================================
-
-
-class ImageDB(object):
- def __init__(self, base_url, shared_base_url=None):
- self.api_helpers = [APIHelper(base_url)]
- if shared_base_url is not None:
- self.api_helpers.append(APIHelper(shared_base_url))
- self.tmpdir = tempfile.mkdtemp()
-
- def clear(self):
- shutil.rmtree(self.tmpdir, ignore_errors=True)
-
- def getURL(self, name, size=None):
- """Return image URL for image with given name
-
- @param name: image name (without namespace, i.e. without 'Image:')
- @type name: unicode
-
- @returns: URL to original image
- @rtype: str
- """
-
- assert isinstance(name, unicode), 'name must be of type unicode'
-
- for api_helper in self.api_helpers:
- if size is None:
- result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url')
- else:
- result = api_helper.page_query(titles='Image:%s' % name, prop='imageinfo', iiprop='url', iiurlwidth=str(size))
- if result is not None:
- break
- else:
- return None
-
- try:
- imageinfo = result['imageinfo'][0]
- if size is not None and 'thumburl' in imageinfo:
- url = imageinfo['thumburl']
- else:
- url = imageinfo['url']
- if url: # url can be False
- if url.startswith('/'):
- url = urlparse.urljoin(self.api_helpers[0].base_url, url)
- return url
- return None
- except (KeyError, IndexError):
- return None
-
- def getDiskPath(self, name, size=None):
- """Return filename for image with given name and size
-
- @param name: image name (without namespace, i.e. without 'Image:')
- @type name: unicode
-
- @param size: if given, the image is converted to the given maximum width
- @type size: int or NoneType
-
- @returns: filename of image or None if image could not be found
- @rtype: basestring
- """
-
- assert isinstance(name, unicode), 'name must be of type unicode'
-
- url = self.getURL(name, size=size)
- if url is None:
- return None
-
- data = fetch_url(url, ignore_errors=True)
- if not data:
- return None
-
- ext = url.rsplit('.')[-1]
- if size is not None:
- ext = '%dpx.%s' % (size, ext)
- else:
- ext = '.%s' % ext
- filename = os.path.join(self.tmpdir, utils.fsescape(name + ext))
- f = open(filename, 'wb')
- f.write(data)
- f.close()
- return filename
-
- def getLicense(self, name):
- """Return license of image as stated on image description page
-
- @param name: image name without namespace (e.g. without "Image:")
- @type name: unicode
-
- @returns: license of image of None, if no valid license could be found
- @rtype: unicode
- """
-
- assert isinstance(name, unicode), 'name must be of type unicode'
-
- for api_helper in self.api_helpers:
- result = api_helper.page_query(titles='Image:%s' % name, prop='templates')
- if result is not None:
- break
- else:
- return None
-
- try:
- templates = [t['title'] for t in result['templates']]
- except KeyError:
- return None
-
- for t in templates:
- try:
- return lower2normal[t.split(':', 1)[-1].lower()]
- except KeyError:
- pass
-
- return None
-
-
-# ==============================================================================
-
-
-class WikiDB(object):
- print_template = u'Template:Print%s'
-
- ip_rex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$')
- bot_rex = re.compile(r'\bbot\b', re.IGNORECASE)
-
- def __init__(self, base_url, license, template_blacklist=None):
- """
- @param base_url: base URL of a MediaWiki,
- e.g. 'http://en.wikipedia.org/w/'
- @type base_url: basestring
-
- @param license: title of an article containing full license text
- @type license: unicode
-
- @param template_blacklist: title of an article containing blacklisted
- templates (optional)
- @type template_blacklist: unicode
- """
-
- self.base_url = base_url
- self.license = license
- self.api_helper = APIHelper(self.base_url)
- self.template_cache = {}
- self.template_blacklist = []
- if template_blacklist is not None:
- raw = self.getRawArticle(template_blacklist)
- if raw is None:
- log.error('Could not get template blacklist article %r' % template_blacklist)
- else:
- self.template_blacklist = [template.lower().strip()
- for template in re.findall('\* *\[\[.*?:(.*?)\]\]', raw)]
-
- def getURL(self, title, revision=None):
- name = urllib.quote(title.replace(" ", "_").encode('utf-8'))
- if revision is None:
- return '%sindex.php?title=%s' % (self.base_url, name)
- else:
- return '%sindex.php?title=%s&oldid=%s' % (self.base_url, name, revision)
-
- def getAuthors(self, title, revision=None, max_num_authors=10):
- """Return at most max_num_authors names of non-bot, non-anon users for
- non-minor changes of given article (before given revsion).
-
- @returns: list of principal authors
- @rtype: [unicode]
- """
-
- result = self.api_helper.page_query(
- titles=title,
- redirects=1,
- prop='revisions',
- rvprop='user|ids|flags|comment',
- rvlimit=500,
- )
- if result is None:
- return None
-
- try:
- revs = result['revisions']
- except KeyError:
- return None
-
- if revision is not None:
- revision = int(revision)
- revs = [r for r in revs if r['revid'] < revision]
-
- authors = [r['user'] for r in revs
- if not r.get('anon')
- and not self.ip_rex.match(r['user'])
- and not r.get('minor')
- and not self.bot_rex.search(r.get('comment', ''))
- and not self.bot_rex.search(r['user'])
- ]
- author2count = {}
- for a in authors:
- try:
- author2count[a] += 1
- except KeyError:
- author2count[a] = 1
- author2count = author2count.items()
- author2count.sort(key=lambda a: -a[1])
- return [a[0] for a in author2count[:max_num_authors]]
-
- def getTemplate(self, name, followRedirects=True):
- """
- Note: *Not* following redirects is unsupported!
- """
-
- try:
- return self.template_cache[name]
- except KeyError:
- pass
-
- if ":" in name:
- name = name.split(':', 1)[1]
-
- if name.lower() in self.template_blacklist:
- log.info("ignoring blacklisted template:" , repr(name))
- return None
-
- for title in (self.print_template % name, 'Template:%s' % name):
- log.info("Trying template %r" % (title,))
- c = self.getRawArticle(title)
- if c is not None:
- self.template_cache[name] = c
- return c
-
- return None
-
- def getRawArticle(self, title, revision=None):
- if revision is None:
- page = self.api_helper.page_query(titles=title, redirects=1, prop='revisions', rvprop='content')
- else:
- page = self.api_helper.page_query(revids=revision, prop='revisions', rvprop='content')
- if page['title'] != title: # given revision could point to another article!
- return None
- if page is None:
- return None
- try:
- return page['revisions'][0].values()[0]
- except KeyError:
- return None
-
- def getMetaData(self):
- result = self.api_helper.query(meta='siteinfo')
- try:
- g = result['general']
- return {
- 'license': {
- 'name': g['rights'],
- 'wikitext': self.getRawArticle(self.license),
- },
- 'url': g['base'],
- 'name': '%s (%s)' % (g['sitename'], g['lang']),
- }
- except KeyError:
- return None
-
- def getParsedArticle(self, title, revision=None):
- raw = self.getRawArticle(title, revision=revision)
- if raw is None:
- return None
- a = uparser.parseString(title=title, raw=raw, wikidb=self)
- return a
-
-
-class Overlay(WikiDB):
- def __init__(self, wikidb, templates):
- self.__dict__.update(wikidb.__dict__)
- self.overlay_templates = templates
-
- def getTemplate(self, name, followRedirects=False):
- try:
- return self.overlay_templates[name]
- except KeyError:
- pass
-
- return super(Overlay, self).getTemplate(name, followRedirects=followRedirects)
-