Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/mwlib/netdb.py
diff options
context:
space:
mode:
Diffstat (limited to 'mwlib/netdb.py')
-rwxr-xr-xmwlib/netdb.py529
1 files changed, 529 insertions, 0 deletions
diff --git a/mwlib/netdb.py b/mwlib/netdb.py
new file mode 100755
index 0000000..ec38ee0
--- /dev/null
+++ b/mwlib/netdb.py
@@ -0,0 +1,529 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2008, PediaPress GmbH
+# See README.txt for additional licensing information.
+
+# An alternative solution to construct the hashpath of images would be to use
+# api.php, e.g.
+# fetch the page http://de.wikipedia.org/w/api.php?action=query&titles=Bild:SomePic.jpg&prop=imageinfo&iiprop=url&format=json
+
+import os
+import sys
+import urllib
+import urllib2
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import md5
+import shutil
+import sys
+import time
+import tempfile
+import re
+
+from mwlib import uparser, utils
+from mwlib.log import Log
+
+log = Log("netdb")
+
+# ==============================================================================
+
+def hashpath(name):
+ """Compute hashpath for an image in the same way as MediaWiki does
+
+ @param name: name of an image
+ @type name: unicode
+
+ @returns: hashpath to image
+ @type: str
+ """
+
+ name = name.replace(' ', '_')
+ name = name[:1].upper()+name[1:]
+ d = md5(name.encode('utf-8')).hexdigest()
+ return "/".join([d[0], d[:2], name])
+
+class ImageDB(object):
+ convert_command = 'convert' # name of/path to ImageMagick's convert tool
+
+ def __init__(self, baseurl, cachedir=None, wikidb=None, knownLicenses=None):
+ """Init ImageDB with a base URL (or a list of base URLs) and optionally
+ with a cache directory.
+
+ @param baseurl: base URL or sequence containing several base URLs
+ @type baseurl: unicode or (unicode,)
+
+ @param cachedir: image cache directory (optional)
+ @type cachedir: basestring or None
+
+ @param wikidb: WikiDB instance used to fetch image description pages to
+ find out image licenses
+ @type wikidb: object
+
+ @param knownLicenses: list of known license templates (whose name is the
+ name of the license) which may appear on image description pages
+ @type knownLicenses: [unicode]
+ """
+
+ if isinstance(baseurl, unicode):
+ self.baseurls = [baseurl.encode('ascii')]
+ else:
+ self.baseurls = []
+ for bu in baseurl:
+ if isinstance(bu, unicode):
+ bu = bu.encode('ascii')
+ self.baseurls.append(bu)
+
+ if cachedir:
+ self.cachedir = cachedir
+ self.tempcache = False
+ else:
+ self.cachedir = tempfile.mkdtemp()
+ self.tempcache = True
+ if self.cachedir[-1] != '/':
+ self.cachedir += '/' # needed for getPath() to work correctly
+
+ self.wikidb = wikidb
+
+ oredLicenses = '|'.join(['(%s)' % re.escape(license)
+ for license in (knownLicenses or [])])
+ self.licenseRegexp = re.compile(r'{{(?P<license>%s)}}' % oredLicenses)
+
+ self.name2license = {}
+
+ def clear(self):
+ """Delete temporary cache directory (i.e. only if no cachedir has been
+ passed to __init__().
+ """
+
+ if self.tempcache:
+ shutil.rmtree(self.cachedir)
+
+ def getURL(self, name, size=None):
+ """Return image URL for image with given name
+
+ @param name: image name (without namespace, i.e. without 'Image:')
+ @type name: unicode
+
+ @returns: URL to original image
+ @rtype: str
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+
+ # use getDiskPath() to fetch and cache (!) image
+ path = self.getDiskPath(name, size=size)
+ if path is None:
+ return None
+
+ # first, look for a cached image with that name (in any size)
+ for baseurl in self.baseurls:
+ urldir = self._getCacheDirForBaseURL(baseurl)
+ if not path.startswith(urldir):
+ continue
+ return self._getImageURLForBaseURL(baseurl, name)
+
+ def getPath(self, name, size=None):
+ """Return path to image with given parameters relative to cachedir"""
+
+ path = self.getDiskPath(name, size=size)
+ if path is None:
+ return None
+ assert path.startswith(self.cachedir), 'invalid path from getDiskPath()'
+ return path[len(self.cachedir):]
+
+ def getDiskPath(self, name, size=None):
+ """Return filename for image with given name. If the image is not found
+ in the cache, it is fetched per HTTP and converted.
+
+ @param name: image name (without namespace, i.e. without 'Image:')
+ @type name: unicode
+
+ @param size: if given, the image is converted to the given maximum size
+ (i.e. the image is scaled so that neither its width nor its height
+ exceed size)
+ @type size: int or NoneType
+
+ @returns: filename of image
+ @rtype: basestring
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+
+ path = self._getImageFromCache(name, size=size)
+ if path:
+ return path
+
+ tmpfile, baseurl = self._fetchImage(name)
+ if tmpfile is None:
+ return None
+
+ self.name2license[name] = self._fetchLicense(baseurl, name)
+
+ path = self._convertToCache(tmpfile, baseurl, name, size=size)
+
+ try:
+ os.unlink(tmpfile)
+ except IOError:
+ log.warn('Could not delete temp file %r' % tmpfile)
+
+ return path
+
+ def _fetchLicense(self, baseurl, name):
+ if self.wikidb is None:
+ return None
+
+ raw = self.wikidb.getImageDescription(name,
+ urlIndex=self.baseurls.index(baseurl),
+ )
+ if raw is None:
+ return None
+
+ mo = re.search(self.licenseRegexp, raw)
+ if mo is None:
+ return None
+
+ return mo.group('license')
+
+ def getLicense(self, name):
+ """Return license of image as stated on image description page
+
+ @param name: image name without namespace (e.g. without "Image:")
+ @type name: unicode
+
+ @returns: license of image of None, if no valid license could be found
+ @rtype: str
+ """
+
+ assert isinstance(name, unicode), 'name must be of type unicode'
+ return self.name2license.get(name)
+
+ def _getImageFromCache(self, name, size=None):
+ """Look in cachedir for an image with the given parameters"""
+
+ for baseurl in self.baseurls:
+ path = self._getCachedImagePath(baseurl, name, size=size)
+ if path is not None and os.path.exists(path):
+ return path
+ return None
+
+ def _getCacheDirForBaseURL(self, baseurl):
+ """Construct the path of the cache directory for the given base URL.
+ This directory doesn't need to exist.
+ """
+
+ return os.path.join(self.cachedir,
+ md5(baseurl.encode('utf-8')).hexdigest()[:8])
+
+ def _getCachedImagePath(self, baseurl, name, size=None, makedirs=False):
+ """Construct a filename for an image with the given parameters inside
+ the cache directory. The file doesn't need to exist. If makedirs is True
+ create all directories up to filename.
+ """
+
+ urlpart = self._getCacheDirForBaseURL(baseurl)
+ if size is not None:
+ sizepart = '%dpx' % size
+ else:
+ sizepart = 'orig'
+
+ if name.lower().endswith('.svg'):
+ if size is None:
+ log.warn('Cannot get SVG image when no size is given')
+ return None
+ name += '.png'
+ if name.lower().endswith('.gif'):
+ name += '.png'
+ name = (name[0].upper() + name[1:]).replace(' ', '_').replace("'", "-")
+
+ d = os.path.join(urlpart, sizepart)
+ if makedirs and not os.path.isdir(d):
+ os.makedirs(d)
+ return os.path.join(d, utils.fsescape(name))
+
+ def _fetchImage(self, name):
+ """Fetch image with given name in original (i.e. biggest) size per HTTP.
+
+ @returns: filename of written image and base URL used to retrieve the
+ image or (None, None) if the image could not be fetched
+ @rtype: (basestring, str) or (NoneType, NoneType)
+ """
+
+ for baseurl in self.baseurls:
+ path = self._fetchImageFromBaseURL(baseurl, name)
+ if path:
+ return path, baseurl
+ return None, None
+
+ def _getImageURLForBaseURL(self, baseurl, name):
+ """Construct a URL for the image with given name under given base URL"""
+
+ hp = hashpath(name).encode('utf-8')
+ return urllib.basejoin(baseurl, urllib.quote(hp))
+
+ def _fetchImageFromBaseURL(self, baseurl, name):
+ """Fetch image with given name under given baseurl and write it to a
+ tempfile.
+
+ @returns: filename of written image or None if image could not be fetched
+ @rtype: basestring or NoneType
+ """
+
+ url = self._getImageURLForBaseURL(baseurl, name)
+ log.info("fetching %r" % (url,))
+ opener = urllib2.build_opener()
+ opener.addheaders = [('User-agent', 'mwlib')]
+ try:
+ data = opener.open(url).read()
+ log.info("got image: %r" % url)
+ fd, filename = tempfile.mkstemp()
+ os.write(fd, data)
+ os.close(fd)
+ return filename
+ except urllib2.URLError, err:
+ log.error("%s - while fetching %r" % (err, url))
+ return None
+
+ def _convertToCache(self, srcfile, baseurl, name, size=None):
+ """Convert image in file named srcfile to have the given maximum size.
+ Save the converted image in the cache directory for the given baseurl.
+
+ @returns: filename of converted image
+ @rtype: basestring
+ """
+ destfile = self._getCachedImagePath(baseurl, name, size=size, makedirs=True)
+ if size is not None:
+ thumbnail = '-thumbnail "%dx%d>"' % (size, size)
+ else:
+ thumbnail = '-strip'
+
+ opts = '-background white -density 100 -flatten -coalesce %(thumbnail)s' % {
+ 'thumbnail': thumbnail,
+ }
+ cmd = "%(convert)s %(opts)s '%(src)s[0]' '%(dest)s'" % {
+ 'convert': self.convert_command,
+ 'opts': opts,
+ 'src': srcfile,
+ 'dest': destfile,
+ }
+ log.info('executing %r' % cmd)
+ rc = utils.shell_exec(cmd)
+ if rc != 0:
+ log.error('Could not convert %r: convert returned %d' % (name, rc))
+ return None
+
+ return destfile
+
+
+# ==============================================================================
+
+def normname(name):
+ name = name.strip().replace("_", " ")
+ name = name[:1].upper()+name[1:]
+ return name
+
+
+class NetDB(object):
+ redirect_rex = re.compile(r'^#Redirect:?\s*?\[\[(?P<redirect>.*?)\]\]', re.IGNORECASE)
+
+ def __init__(self, pagename,
+ imagedescriptionurls=None,
+ templateurls=None,
+ templateblacklist=None,
+ defaultauthors=None,
+ ):
+ """
+ @param pagename: URL to page in wikitext format. @TITLE@ gets replaced
+ with the page name and @REVISION@ gets replaced with the requested
+ revision/oldid. E.g.
+
+ "http://mw/index.php?title=@TITLE@&action=raw&oldid=@TITLE@"
+
+ @type pagename: str
+
+ @param imagedescriptionurls: list of URLs to image description pages in
+ wikitext format. @TITLE@ gets replaced with the image title w/out
+ its prefix. E.g.
+
+ ["http://mw/index.php?title=Image:@TITLE@s&action=raw"]
+
+ The list must be of the same length as the baseurl list of the
+ accompanying ImageDB, and the URL with the corresponding position
+ in the list is used to retrieve the description page.
+ @type imagedescriptionurls: [str]
+
+ @param templateurls: list of URLs to template pages in wikitext format.
+ @TITLE@ gets replaced with the template title. E.g.
+
+ ["http://mw/index.php?title=Template:@TITLE@s&action=raw"]
+
+ If more than one URL is specified, URLs are tried in given order.
+ @type templateurls: [str]
+
+ @param defaultauthors: list of default (principal) authors for articles
+ @type defaultauthors: [unicode]
+ """
+
+ self.pagename = pagename.replace("%", "%%").replace("@TITLE@", "%(NAME)s").replace("@REVISION@", "%(REVISION)s")
+
+ if templateurls is None:
+ templateurls = []
+ self.templateurls = [x.replace("%", "%%").replace("@TITLE@", "%(NAME)s")
+ for x in templateurls]
+
+ if imagedescriptionurls is None:
+ imagedescriptionurls = []
+ self.imagedescriptionurls = [x.replace("%", "%%").replace("@TITLE@", "%(NAME)s")
+ for x in imagedescriptionurls]
+
+ if templateblacklist:
+ self.templateblacklist = self._readTemplateBlacklist(templateblacklist)
+ else:
+ self.templateblacklist = []
+
+ if defaultauthors:
+ self.defaultauthors = defaultauthors
+ else:
+ self.defaultauthors = []
+
+ self.pages = {}
+
+ def _getpage(self, url, expectedContentType='text/x-wiki'):
+ try:
+ return self.pages[url]
+ except KeyError:
+ pass
+
+ stime=time.time()
+ response = urllib.urlopen(url)
+ data = response.read()
+ log.info('fetched %r in %ss' % (url, time.time()-stime))
+
+ if expectedContentType:
+ ct = response.info().gettype()
+ if ct != expectedContentType:
+ log.warn('Skipping page %r with content-type %r (%r was expected). Skipping.'\
+ % (url, ct, expectedContentType))
+ return None
+
+ self.pages[url] = data
+ return data
+
+ def _readTemplateBlacklist(self,templateblacklist):
+ if not templateblacklist:
+ return []
+ try:
+ content = urllib.urlopen(templateblacklist).read()
+ return [template.lower().strip() for template in re.findall('\* *\[\[.*?:(.*?)\]\]', content)]
+ except: # fixme: more sensible error handling...
+ log.error('Error fetching template blacklist from url:', templateblacklist)
+ return []
+
+ def _dummy(self, *args, **kwargs):
+ pass
+
+ startCache = _dummy
+
+ def getURL(self, title, revision=None):
+ name = urllib.quote(title.replace(" ", "_").encode('utf8'))
+ if revision is None:
+ return self.pagename % dict(NAME=name, REVISION='0')
+ else:
+ return self.pagename % dict(NAME=name, REVISION=revision)
+
+ def getAuthors(self, title, revision=None):
+ return list(self.defaultauthors)
+
+ def title2db(self, title):
+ assert isinstance(title, unicode), 'title must be of type unicode'
+ return title.encode('utf-8')
+
+ def db2title(self, dbtitle):
+ assert isinstance(dbtitle, str), 'dbtitle must be of type str'
+ return unicode(dbtitle, 'utf-8')
+
+ def getImageDescription(self, title, urlIndex=0):
+ """Fetch the image description page for the image with the given title.
+ If baseurl and self.imagedescriptions contains more than one URL, use
+ the one which starts with baseurl.
+
+ @param title: title of the image w/out prefix (like Image:)
+ @type title: unicode
+
+ @param urlIndex: index for imagedescriptionurls
+ @type urlIndex: int
+
+ @returns: wikitext of image description page or None
+ @rtype: unicode or None
+ """
+
+ if not self.imagedescriptionurls:
+ return None
+
+ raw = self._getpage(self.imagedescriptionurls[urlIndex] % {
+ 'NAME': urllib.quote(title.replace(" ", "_").encode('utf8')),
+ })
+ if raw is None:
+ return None
+
+ return unicode(raw, 'utf-8')
+
+ def getTemplate(self, name, followRedirects=False):
+ if ":" in name:
+ name = name.split(':', 1)[1]
+
+
+ if name.lower() in self.templateblacklist:
+ log.info("ignoring blacklisted template:" , repr(name))
+ return None
+ name = urllib.quote(name.replace(" ", "_").encode('utf8'))
+ for u in self.templateurls:
+ url = u % dict(NAME=name)
+ log.info("Trying %r" %(url,))
+ c=self._getpage(url)
+ if c:
+ log.info("got content from", url)
+ res=unicode(c, 'utf8')
+ mo = self.redirect_rex.search(res)
+ if mo:
+ redirect = mo.group('redirect')
+ redirect = normname(redirect.split("|", 1)[0].split("#", 1)[0])
+ return self.getTemplate(redirect)
+ return res
+
+
+
+
+ #return self.getRawArticle(u'Template:%s' % name)
+
+ def getRawArticle(self, title, revision=None):
+ r = self._getpage(self.getURL(title, revision=revision))
+ if r is None:
+ return None
+ return unicode(r, 'utf8')
+
+ def getRedirect(self, title):
+ return u""
+
+ def getParsedArticle(self, title, revision=None):
+ raw = self.getRawArticle(title, revision=revision)
+ if raw is None:
+ return None
+ a = uparser.parseString(title=title, raw=raw, wikidb=self)
+ return a
+
+
+class Overlay(NetDB):
+ def __init__(self, wikidb, templates):
+ self.__dict__.update(wikidb.__dict__)
+ self.overlay_templates = templates
+
+ def getTemplate(self, name, followRedirects=False):
+ try:
+ return self.overlay_templates[name]
+ except KeyError:
+ pass
+
+ return super(Overlay, self).getTemplate(name, followRedirects=followRedirects)
+