diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-12 21:14:06 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-12 21:14:06 (GMT) |
commit | f577c2c142c7648a482e0eec7ecd736c1ca716d7 (patch) | |
tree | 259c5cf191116379e97d8aebc260f9664ad3a0e5 | |
parent | d7092a126f230f22344b50d79b8bd362d659953b (diff) |
checkpoint new branch before the property type/xapian field merge
-rwxr-xr-x | bin/index-service | 173 | ||||
-rw-r--r-- | etc/org.laptop.sugar.Indexer.service.in | 3 | ||||
-rw-r--r-- | src/olpc/datastore/query.py | 642 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 271 | ||||
-rwxr-xr-x | tests/cleaner.py | 39 | ||||
-rw-r--r-- | tests/test_xapianindex.py | 91 | ||||
-rw-r--r-- | tests/xapianindex.txt | 76 |
7 files changed, 477 insertions, 818 deletions
diff --git a/bin/index-service b/bin/index-service deleted file mode 100755 index a2ff83c..0000000 --- a/bin/index-service +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python - -""" Async index service for the Datastore. - -Subscribes to the create/update/delete messages of the Datastore and -performs the indexing. When this service is enabled the Datastore -access the Xapian repository in read only mode. -""" - - -try: from ore.main import Application -except ImportError: Application = object - -from olpc.datastore.datastore import DS_SERVICE, DS_OBJECT_PATH -from olpc.datastore.datastore import DS_DBUS_INTERFACE -from olpc.datastore.indexer import Indexer -import dbus -import dbus.mainloop.glib -import logging -import sys -import os -import signal - -profile = os.environ.get('SUGAR_PROFILE', 'default') -base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) -repo_dir = os.path.join(base_dir, 'datastore') -fulltext_dir = os.path.join(repo_dir, 'fulltext') - -log_dir = os.path.join(base_dir, "logs") -if not os.path.exists(log_dir): os.makedirs(log_dir) - -os.chdir(repo_dir) - -# setup logger -filename = None -if not sys.stdin.isatty(): - filename = os.path.join(log_dir, "indexer.log") -logging.basicConfig(level=logging.DEBUG, - format="%(asctime)-15s %(levelname)s: %(message)s", - filename = filename, - ) - -logger = logging.getLogger('org.laptop.sugar.Indexer') -logger.setLevel(logging.DEBUG) - -class IndexService(Application): - def manage_options(self): - self.parser.add_option("--olpc.fulltext.repo", - dest="fulltext_dir", - action="store", default='fulltext', - help="""Location of the FullText Repository""") - - - def main(self): - logging.debug('Starting the index service at %s' % self.options.fulltext_dir) - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - bus = dbus.SessionBus() - self.fulltext = Indexer(self.options.fulltext_dir) - self.fulltext.use_fulltext = True - - ds = bus.get_object(DS_SERVICE, DS_OBJECT_PATH) - self.ds = dbus.Interface(ds, dbus_interface=DS_DBUS_INTERFACE) - - self.ds.connect_to_signal("Created", self.created, - dbus_interface=DS_DBUS_INTERFACE) - - self.ds.connect_to_signal("Updated", self.updated, - dbus_interface=DS_DBUS_INTERFACE) - - self.ds.connect_to_signal("Deleted", self.deleted, - dbus_interface=DS_DBUS_INTERFACE) - - - self.ds.connect_to_signal("Stopped", self.stopped, - dbus_interface=DS_DBUS_INTERFACE) - - self.eventloop.run() - - def get_textprops(self, uid): - # text properties also get full text indexing - # currently this is still searched with the 'fulltext' - # parameter of find() - textprops = {} - for k,v in self.ds.get_properties(uid, dict(type='text')).items(): - textprops[str(k)] = v and str(v) or '' - return textprops - - def created(self, uid): - """An object was created on the bus and we want to index it""" - # because the file isn't encoded anywhere accessible in the - # create call we must actually get the filename and trigger - # the indexing on that - filename = self.ds.get_filename(uid) - r = None - if filename: - mime_type = self.ds.get_properties(uid, {}).get('mime_type', None) - r = self.fulltext.fulltext_index(uid, filename, mime_type, - self.get_textprops(uid)) - if r is True: - logger.debug("index creation of %s" % uid) - elif r is False: - logger.debug("unable to index creation of %s" % uid) - else: - logger.debug("nothing to index on creation of %s" % uid) - - def updated(self, uid): - """An object was updated on the bus and we want to index it""" - # because the file isn't encoded anywhere accessible in the - # create call we must actually get the filename and trigger - # the indexing on that - filename = self.ds.get_filename(uid) - r = None - if filename: - mime_type = self.ds.get_properties(uid, {}).get('mime_type', - None) - r = self.fulltext.fulltext_index(uid, filename, mime_type, - self.get_textprops(uid)) - if r is True: - logger.debug("index update of %s" % uid) - elif r is False: - logger.debug("unable to index update of %s" % uid) - else: - logger.debug("nothing to index on update of %s" % uid) - - - def deleted(self, uid): - """An object was updated on the bus and we want to index it""" - # because the file isn't encoded anywhere accessible in the - # create call we must actually get the filename and trigger - # the indexing on that - try: - self.fulltext.fulltext_unindex(uid) - logger.debug("unindex deletion of %s" % uid); - except KeyError: pass - - - def stopped(self): - """Respond to the datastore being stopped by shutting down - ourselves""" - self.fulltext.stop() - self.eventloop.quit() - - -if __name__ == "__main__": - def handle_shutdown(signum, frame): - idx.stopped() - print "shutdown cleanly" - raise SystemExit("Shutting down on signal %s" % signum) - - signal.signal(signal.SIGHUP, handle_shutdown) - signal.signal(signal.SIGTERM, handle_shutdown) - - idx = IndexService() - #idx() - # w/o ore.main - - import gobject - idx.eventloop = gobject.MainLoop() - class options(object): pass - o = options() - o.fulltext_dir = 'fulltext' - idx.options = o - try: - idx.main() - except: - # force logging this one - logger.setLevel(logging.DEBUG) - logger.debug("Problem in index service", - exc_info=sys.exc_info()) - idx.stopped() - - - diff --git a/etc/org.laptop.sugar.Indexer.service.in b/etc/org.laptop.sugar.Indexer.service.in deleted file mode 100644 index fb0a7ec..0000000 --- a/etc/org.laptop.sugar.Indexer.service.in +++ /dev/null @@ -1,3 +0,0 @@ -[D-BUS Service] -Name = org.laptop.sugar.Indexer -Exec = @bindir@/index-service diff --git a/src/olpc/datastore/query.py b/src/olpc/datastore/query.py deleted file mode 100644 index 2c5dd9f..0000000 --- a/src/olpc/datastore/query.py +++ /dev/null @@ -1,642 +0,0 @@ -""" -olpc.datastore.query -~~~~~~~~~~~~~~~~~~~~ -manage the metadata index and make it queryable. this in turn will -depend on olpc.datastore.fulltext which indexes the actual content. - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - - -from datetime import datetime -from lemur.xapian.sei import DocumentStore, DocumentPiece, SortableValue -from olpc.datastore.converter import converter -from olpc.datastore.model import DateProperty, TextProperty -from olpc.datastore.model import Model, Content, Property, propertyByKind -from olpc.datastore.utils import create_uid - -from sqlalchemy import create_engine, BoundMetaData -from sqlalchemy import select, intersect, and_ -import atexit -import logging -import os, sys - -_marker = object() - - -class SugarDomain(object): - """The underlying property set used for metadata in the sugar - system""" - def kind_by_key(self, key): - """resolves property names to the factory type that supports - them in the model - """ - # key may be a two part form directly indicating the property - # type - if ':' in key: - key, kind = key.split(':', 1) - # now resolve the kind to a property class - return key, propertyByKind(kind) - - return key, { - 'ctime' : DateProperty, - 'mtime' : DateProperty, - 'author' : Property, - 'title' : TextProperty, - 'mime_type' : Property, - 'language' : Property, - }.get(key, Property) - - def propertyFactory(self, key, value='', dict=None): - key, kind = self.kind_by_key(key) - p = kind(key, value) - if dict is not None: dict[key] = p - return kind - - def _automaticProperties(self): - d = {} - now = datetime.now() - self.propertyFactory('mtime', now, dict=d) - return d - - def _defaultProperties(self): - d = {} - now = datetime.now() - self.propertyFactory('ctime', now, dict=d) - self.propertyFactory('author', dict=d) - self.propertyFactory('title', dict=d) - self.propertyFactory('mime_type', dict=d) - self.propertyFactory('language', dict=d) - - d.update(self._automaticProperties()) - return d - - def _normalizeProps(self, props, creating, include_defaults): - # return a dict of {name : property} - if isinstance(props, dict): - # convert it into a dict of Property objects - d = {} - for k,v in props.iteritems(): - k, kind = self.kind_by_key(k) - p = kind(k, v) - d[k] = p - if creating and include_defaults: - defaults = self._defaultProperties() - for k, v in defaults.iteritems(): - if k not in d: d[k] = v - props = d - else: - d = {} - for p in props: - d[p.key] = p - props = d - return props - - - -class QueryManager(SugarDomain): - FULLTEXT_NAME = "fulltext" - - def __init__(self, metadata_uri, **options): - """ - The metadata_uri is a string used to find the database. - - - This will check keywords for: - 'language' Language is the language code used in the fulltext - engine. This helps improve stemming and - so on. In the future additional control - will be provided. - - 'sync_index' which determines if we use an internal - sync index impl or an out of process one - via DBus. If the async process is to be - used it must be properly configured and - available for DBus to spawn. - - 'fulltext_repo' the full filepath to which the fulltext - index data will be stored - - 'use_fulltext' when true indexing will be performed - - """ - self.uri = metadata_uri - self.options = options - - self.backingstore = None - self.content_ext = None - - - def _handle_option(self, options, key, default=_marker): - value = options.get(key, default) - if value is _marker: raise KeyError(key) - setattr(self, key, value) - - def _handle_options(self, **kwargs): - self._handle_option(kwargs, 'fulltext_repo') - self._handle_option(kwargs, 'use_fulltext', True) - self._handle_option(kwargs, 'sync_index', True) - self._handle_option(kwargs, 'language', 'en') - self.sync_index = self.use_fulltext and self.sync_index - - def bind_to(self, backingstore): - self.backingstore = backingstore - - def prepare_index(self): - self.connect_db() - self.prepare_db() - self.connect_model() - - def prepare_fulltext(self): - self.connect_fulltext(self.fulltext_repo, self.language, - read_only=not self.sync_index) - - def prepare(self): - """This is called by the datastore with its backingstore and - querymanager. Its assumed that querymanager is None and we are - the first in this release - """ - self._handle_options(**self.options) - self.prepare_index() - self.prepare_fulltext() - return True - - def stop(self): - pass - - # Primary interface - def create(self, props, filelike=None, include_defaults=True): - """Props can either be a dict of k,v pairs or a sequence of - Property objects. - - The advantage of using property objects is that the data can - by typed. When k/v pairs are used a default string type will - be chosen. - - When include_defaults is True a default set of properties are - created on behalf of the Content if they were not provided. - - These include: - author : '' - title : '' - mime_type : '' - language : '', - ctime : '', - mtime : '', - """ - s = self.model.session - c = Content() - # its important the id be set before other operations - c.id = create_uid() - s.save(c) - - self._bindProperties(c, props, creating=True, include_defaults=include_defaults) - s.flush() - c.backingstore = self.backingstore - - if self.sync_index and filelike: - self.fulltext_index(c.id, filelike, - mimetype=c.get_property('mime_type'), - textprops=self.get_textprops(c)) - - return c - - def update(self, content_or_uid, props=None, filelike=None): - content = self._resolve(content_or_uid) - content.backingstore = self.backingstore - if props is not None: - self._bindProperties(content, props, creating=False) - self.model.session.flush() - - if self.sync_index and filelike: - self.fulltext_index(content.id, filelike, textprops=self.get_textprops(content)) - - - def _bindProperties(self, content, props, creating=False, include_defaults=False): - """Handle either a dict of properties or a list of property - objects, binding them to the content instance. - """ - # for information on include_defaults see create() - # default properties are only provided when creating is True - session = self.model.session - - props = self._normalizeProps(props, creating, - include_defaults) - - # we should have a dict of property objects - if creating: - content.properties.extend(props.values()) - else: - # if the automatically maintained properties (like mtime) - # are not set, include them now - auto = self._automaticProperties() - auto.update(props) - props = auto - # we have to check for the update case - oldProps = dict([(p.key, p) for p in content.properties]) - for k, p in props.iteritems(): - if k in oldProps: - oldProps[k].value = p.value - oldProps[k].type = p.type - else: - content.properties.append(p) - - def get(self, uid): - return self.model.session.query(self.model.mappers['content']).get(uid) - - def get_properties(self, content_or_uid, keys): - c = self._resolve(content_or_uid) - return self.model.session.query(Property).select_by(self.model.property.c.key.in_(keys), - content_id=c.id) - - - def get_uniquevaluesfor(self, propertyname): - properties = self.model.tables['properties'] - return [r[0] for r in select([properties.c.value], - properties.c.key==propertyname, - distinct=True).execute().fetchall()] - - - - def delete(self, content_or_uid): - c = self._resolve(content_or_uid) - s = self.model.session - s.delete(c) - s.flush() - if self.sync_index: - self.fulltext_unindex(c.id) - - - def find(self, query=None, **kwargs): - """ - dates can be search in one of two ways. - date='YYYY-MM-DD HH:MM:SS' - date={'start' : 'YYYY-MM-DD HH:MM:SS', - 'end' : 'YYYY-MM-DD HH:MM:SS' - } - where date is either ctime or mtime. - if start or end is omitted its becomes a simple before/after - style query. If both are provided its a between query. - - providing the key 'fulltext' will include a full text search - of content matching its parameters. see fulltext_search for - additional details. - - - If 'limit' is passed it will be the maximum number of results - to return and 'offset' will be the offset from 0 into the - result set to return. - - """ - - # XXX: this will have to be expanded, but in its simplest form - if not self.sync_index: self.index.reopen() - - s = self.model.session - properties = self.model.tables['properties'] - if not query: query = {} - query.update(kwargs) - q = s.query(Content) - # rewrite the query to reference properties - # XXX: if there is a 'content' key will will have to search - # the content using the full text index which will result in a - # list of id's which must be mapped into the query - # fulltext_threshold is the minimum acceptable relevance score - limit = query.pop('limit', None) - offset = query.pop('offset', None) - - if offset: q = q.offset(offset) - if limit: q = q.limit(limit) - - if query: - where = [] - fulltext = query.pop('fulltext', None) - threshold = query.pop('fulltext_threshold', 60) - - - - statement = None - ft_select = None - - if query: - # daterange support - # XXX: this is sort of a hack because - # - it relies on Manifest typing in sqlite - # - value's type is not normalized - # - we make special exception based on property name - # if we need special db handling of dates ctime/mtime - # will become columns of Content and not properties - ctime = query.pop('ctime', None) - mtime = query.pop('mtime', None) - if ctime or mtime: - self._query_dates(ctime, mtime, where) - for k,v in query.iteritems(): - if isinstance(v, list): - v = properties.c.value.in_(*v) - else: - v = properties.c.value==v - - where.append(select([properties.c.content_id], - and_( properties.c.key==k, - v))) - - statement = intersect(*where) - statement.distinct=True - - if fulltext and self.use_fulltext: - # perform the full text search and map the id's into - # the statement for inclusion - ft_res = self.fulltext_search(fulltext) - if ft_res: - ft_ids = [ft[0] for ft in ft_res if ft[1] >= - threshold] - - if ft_ids: - ft_select = select([properties.c.content_id], - properties.c.content_id.in_(*ft_ids)) - - if ft_select is None: - # the full text query eliminated the possibility - # of results by returning nothing under a logical - # AND condition, bail now - return ([], 0) - else: - if statement is None: - statement = ft_select - statement.distinct = True - else: - statement = intersect(statement, ft_select) - - result = statement.execute() - r = [q.get(i[0]) for i in result] - r = (r, len(r)) - else: - r = (q.select(), q.count()) - - # XXX: make sure the proper backingstore is mapped - # this currently forbids the use case of keeping index data - # for a read-only store. - for item in r[0]: - item.backingstore = self.backingstore - - return r - - # sqla util - def _resolve(self, content_or_uid): - if isinstance(content_or_uid, basestring): - # we need to resolve the object - content_or_uid = self.model.session.query(Content).get(content_or_uid) - return content_or_uid - - def _query_dates(self, ctime, mtime, selects): - if ctime: selects.append(self._query_date('ctime', ctime)) - if mtime: selects.append(self._query_date('mtime', mtime)) - - def _query_date(self, key, date): - properties = self.model.properties - - if isinstance(date, basestring): - s = select([properties.c.content_id], - and_( properties.c.key==key, - properties.c.value==date)) - else: - # its a dict with start/end - start = date.get('start') - end = date.get('end') - if start and end: - s = select([properties.c.content_id], - and_( properties.c.key==key, - properties.c.value.between(start, - end))) - elif start: - s = select([properties.c.content_id], - and_( properties.c.key==key, - properties.c.value >=start)) - else: - s = select([properties.c.content_id], - and_( properties.c.key==key, - properties.c.value < end)) - - return s - - - def get_textprops(self, uid_or_content): - # text properties also get full text indexing - # currently this is still searched with the 'fulltext' - # parameter of find() - content = self._resolve(uid_or_content) - textprops = {} - for p in content.get_properties(type='text'): - textprops[p.key] = p.value and p.value or '' - return textprops - - - # fulltext interface - def fulltext_index(self, uid, fileobj, mimetype=None, textprops=None): - """Index the fileobj relative to uid which should be a - olpc.datastore.model.Content object's uid. The fileobj can be - either a pathname or an object implementing the Python file - ('read') interface. - """ - pass - - def fulltext_unindex(self, content_id): - pass - - def fulltext_search(self, *args, **kwargs): - return [] - - # lifecycle - def connect_db(self): - """Connect to the underlying database. Called implicitly by - __init__""" - pass - - - def prepare_db(self): - """After connecting to the metadata database take any - initialization steps needed for the environment. - - This is called implicitly by __init__ before the model is - brought online. - """ - pass - - def connect_model(self, model): - """Connect the model. Called with the model passed into - __init__ after the database has been prepared. - """ - pass - - def connect_fulltext(self, repo, language, read_only): - """Connect the full text index""" - pass - - -class SQLiteQueryManager(QueryManager): - """The default implementation of the query manager. This owns the - model object and the fulltext object - """ - - def __init__(self, uri, **kwargs): - super(SQLiteQueryManager, self).__init__(uri, **kwargs) - # now re-write the URI to be sqlite specific - # (we were initialized to a namepattern in the proper - # directory by the backingstore) - self.uri= "sqlite:///%s.db" % self.uri - - def connect_db(self): - self.db = create_engine(self.uri) - self.metadata = BoundMetaData(self.db) - - def prepare_db(self): - # Using the sqlite backend we can tune the performance to - # limit writes as much as possible - if self.db.name.startswith('sqlite'): - connection = self.db.connect() - # cut down of per-activity file locking writes - connection.execute("PRAGMA locking_mode=EXCLUSIVE") - # don't demand fsync -- if this is too dangerous - # we can change it to normal which is still less writey - # than the default FULL - connection.execute("PRAGMA synchronous=OFF") - # temporary tables and indices are kept in memory - connection.execute("PRAGMA temp_store=MEMORY") - # XXX: what is the ideal jffs2 page size - # connection.execute("PRAGMA page_size 4096") - - def connect_model(self, model=None): - if model is None: model = Model() - # take the model and connect it to us - model.prepare(self) - - # make sure all the tables and indexes exist - self.metadata.create_all() - - self.model = model - - - def stop(self): - # clean up - self.db.dispose() - -# Full text support -def flatten_unicode(value): return value.encode('utf-8') - -class XapianBinaryValue(SortableValue): - def __init__(self, value, field_name="content"): - SortableValue.__init__(self, value, field_name) - -class XapianFulltext(object): - def connect_fulltext(self, repo, language='en', read_only=True): - if not os.path.exists(repo) and read_only is True: - # create the store - index = DocumentStore(repo, language, read_only=False) - index.close() - # and abandon it - self.index = DocumentStore(repo, language, read_only=read_only) - self.index.registerFlattener(unicode, flatten_unicode) - atexit.register(self.index.close) - - def fulltext_index(self, uid, fileobj, mimetype=None, textprops=None): - """Index the fileobj relative to uid which should be a - olpc.datastore.model.Content's uid. The fileobj can be either - a pathname or an object implementing the Python file ('read') - interface. - """ - piece = DocumentPiece - if isinstance(fileobj, basestring): - # treat it as a pathname - # use the global converter to try to get text from the - # file - fp = converter(fileobj, mimetype=mimetype) - #piece = XapianBinaryValue - elif hasattr(fileobj, 'read'): - # this is an off case, we have to assume utf-8 data - logging.debug("Indexing from readable, not filename") - fp = fileobj - else: - raise ValueError("Not a valid file object") - - if fp is None: - # for whatever reason we were unable to get the content - # into an indexable form. - logging.debug("Unable to index %s %s" % (uid, fileobj)) - return False - return self._ft_index(uid, fp, piece, textprops) - - def _ft_index(self, content_id, fp, piece=DocumentPiece, fields=None): - try: - doc = [piece(fp.read())] - if fields: - # add in properties that need extra fulltext like - # management - for key, value in fields.iteritems(): - doc.append(DocumentPiece(value, key)) - - self.index.addDocument(doc, content_id) - self.index.flush() - return True - except: - logging.debug("fulltext index exception", exc_info=sys.exc_info()) - return False - - - - def fulltext_search(self, *args, **kwargs): - """ - perform search(search_string, ) -> [(content_id, relevance), ...] - - search_string is a string defining the serach in standard web search - syntax. - - ie: it contains a set of search terms. Each search term may be - preceded by a "+" sign to indicate that the term is required, or a "-" - to indicate that is is required to be absent. - - If field_name is not None, it is the prefix of a field, which the - search will be restricted to. - - If field_name is None, the search will search all fields by default, - but search terms may be preceded by a fieldname followed by a colon to - restrict part of the search to a given field. - - combiner is one of DocumentStore.OP_OR or DocumentStore.OP_AND, and is - used to indicate the default operator used to combine terms. - - partial is a flag, which should be set to True to enable partial search - matching, for use when doing interactive searches and we're not sure if - the user has finished typing the search yet. - - range_restrictions is a RangeRestrictions object, used to restrict the - search results. - - """ - if len(args) == 1: - # workaround for api change - args = (args[0], 0, 10) - - res = self.index.performSearch(*args, **kwargs) - est = max(1, res.estimatedResultCount()) - return res.getResults(0, est) - - def fulltext_similar(self, *content_ids): - return self.index.findSimilar(content_ids) - - def fulltext_unindex(self, content_id): - self.index.deleteDocument(content_id) - - def stop(self): - if self.use_fulltext: - self.index.close() - - -class DefaultQueryManager(XapianFulltext, SQLiteQueryManager): - - def stop(self): - XapianFulltext.stop(self) - SQLiteQueryManager.stop(self) diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py new file mode 100644 index 0000000..5772433 --- /dev/null +++ b/src/olpc/datastore/xapianindex.py @@ -0,0 +1,271 @@ +""" +xapianindex +~~~~~~~~~~~~~~~~~~~~ +maintain indexes on content + +""" + +__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' +__docformat__ = 'restructuredtext' +__copyright__ = 'Copyright ObjectRealms, LLC, 2007' +__license__ = 'The GNU Public License V2+' + + +from Queue import Queue, Empty +import logging +import re + +import threading +import warnings + +import secore + +from olpc.datastore import model +from olpc.datastore.converter import converter +from olpc.datastore.utils import create_uid + + +# Setup Logger +logger = logging.getLogger('org.sugar.datastore.xapianindex') + + +class IndexManager(object): + + def __init__(self, language='en'): + # We will maintain two connections to the database + # we trigger automatic flushes to the read_index + # after any write operation + self.write_index = None + self.read_index = None + self.queue = Queue(0) + self.indexer_running = False + self.language = language + + self.fields = set() + + # + # Initialization + def connect(self, repo): + if self.write_index is not None: + warnings.warn('''Requested redundant connect''', RuntimeWarning) + + self.write_index = secore.IndexerConnection(repo) + self.setupFields() + + self.read_index = secore.SearchConnection(repo) + + # by default we start the indexer now + self.startIndexer() + + def stop(self): + self.stopIndexer() + self.write_index.close() + self.read_index.close() + + + def startIndexer(self): + self.indexer_running = True + self.indexer = threading.Thread(target=self.indexThread, + name="XapianIndexer") + self.indexer.setDaemon(True) + self.indexer.start() + + def stopIndexer(self, force=False): + if not self.indexer_running: return + if not force: self.queue.join() + self.indexer_running = False + self.indexer.join() + + def enque(self, uid, vid, doc): + self.queue.put((uid, vid, doc)) + + def indexThread(self): + # process the queue + while self.indexer_running: + # include timeout here to ease shutdown of the thread + # if this is a non-issue we can simply allow it to block + try: + uid, vid, doc = self.queue.get(timeout=0.5) + self.write_index.add(doc) + self.flush() + logger.info("Indexed Content %s:%s" % (uid, vid)) + self.queue.task_done() + except Empty: + pass + + @property + def working(self): + """Does the indexer have work""" + return not self.queue.empty() + + def flush(self): + """Called after any database mutation""" + self.write_index.flush() + self.read_index.reopen() + + # + # Field management + def addField(self, key, store=True, exact=False, sortable=False, + type='string', collapse=False, + **kwargs): + language = kwargs.pop('language', self.language) + + xi = self.write_index.add_field_action + + if store: xi(key, secore.FieldActions.STORE_CONTENT) + if exact: xi(key, secore.FieldActions.INDEX_EXACT) + else: + # weight -- int 1 or more + # nopos -- don't include positional information + # noprefix -- boolean + xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs) + + if sortable: + xi(key, secore.FieldActions.SORTABLE, type=type) + if collapse: + xi(key, secore.FieldActions.COLLAPSE) + + # track this to find missing field configurations + self.fields.add(key) + + def setupFields(self): + # add standard fields + # text is content objects information + self.addField('text', store=False, exact=False) + + # vid is version id + self.addField('vid', store=True, exact=True, sortable=True, type="float") + + # Title has additional weight + self.addField('title', store=True, exact=False, weight=2, sortable=True) + + self.addField('mimetype', store=True, exact=True) + self.addField('author', store=True, exact=True) + self.addField('language', store=True, exact=True) + + + self.addField('ctime', store=True, exact=True, sortable=True, type='date') + self.addField('mtime', store=True, exact=True, sortable=True, type='date') + + # + # Index Functions + def index(self, props, filename=None): + """Index the content of an object. + Props must contain the following: + key -> Property() + """ + doc = secore.UnprocessedDocument() + add = doc.fields.append + + if filename: + mimetype = props.get("mimetype") + mimetype = mimetype and mimetype.value or 'text/plain' + fp = converter(filename, mimetype) + + # + # File contents + if fp: + # add the (converted) document contents + add(secore.Field('text', fp.read())) + + # + # Version handling + # + # we implicitly create new versions of documents the version + # id should have been set by the higher level system + uid = props.pop('uid', None) + vid = props.pop('vid', None) + + if uid: uid = uid.value + else: uid = create_uid() + if vid: vid = vid.value + else: vid = "1.0" + + doc.id = uid + add(secore.Field('vid', vid)) + + # + # Property indexing + for k, prop in props.iteritems(): + if isinstance(prop, model.BinaryProperty): continue + value = prop.value + if k not in self.fields: + warnings.warn("""Missing field configuration for %s""" % k, + RuntimeWarning) + continue + add(secore.Field(k, value)) + + # queue the document for processing + self.enque(uid, vid, doc) + + return uid + + # + # Search + def search(self, query, start_index=0, end_index=50): + """search the xapian store. + query is a string defining the serach in standard web search syntax. + + ie: it contains a set of search terms. Each search term may be + preceded by a "+" sign to indicate that the term is required, or a "-" + to indicate that is is required to be absent. + """ + # this will return the [(id, relevance), ...], estimated + # result count + ri = self.read_index + if isinstance(query, dict): + queries = [] + # each term becomes part of the query join + for k, v in query.iteritems(): + queries.append(ri.query_field(k, v)) + q = ri.query_composite(ri.OP_AND, queries) + else: + q = self.parse_query(query) + + + results = ri.search(q, start_index, end_index) + return [r.id for r in results] + + def parse_query(self, query): + # accept standard web query like syntax + # 'this' -- match this + # 'this that' -- match this and that in document + # '"this that"' match the exact pharse 'this that' + # 'title:foo' match a document whose title contains 'foo' + # 'title:"A tale of two datastores"' exact title match + # '-this that' match that w/o this + ri = self.read_index + start = 0 + end = len(query) + nextword = re.compile("(\S+)") + endquote = re.compile('(")') + queries = [] + while start < end: + m = nextword.match(query, start) + if not m: break + orig = start + field = None + start = m.end() + 1 + word = m.group(1) + if ':' in word: + # see if its a field match + fieldname, w = word.split(':', 1) + if fieldname in self.fields: + field = fieldname + + word = w + + if word.startswith('"'): + qm = endquote.search(query, start) + if qm: + #XXX: strip quotes or not here + #word = query[orig+1:qm.end(1)-1] + word = query[orig:qm.end(1)] + start = qm.end(1) + 1 + + if field: + queries.append(ri.query_field(field, word)) + else: + queries.append(ri.query_parse(word)) + q = ri.query_composite(ri.OP_AND, queries) + return q diff --git a/tests/cleaner.py b/tests/cleaner.py new file mode 100755 index 0000000..8cc795b --- /dev/null +++ b/tests/cleaner.py @@ -0,0 +1,39 @@ +#!/usr/bin/python +import os +import re +from ore.main import Application + +filepattern = re.compile("(\w{8})\-(\w{4})\-(\w{4})\-(\w{4})\-(\w{12})") +tmppattern = re.compile("tmp\S{6}") + +staticdirs = re.compile('test_ds|store\d') + +filepatterns = [filepattern, tmppattern] +dirpatterns = [staticdirs] + +class Cleaner(Application): + def manage_options(self): + self.parser.add_option("--base", dest="base_dir", + action="store", default='/tmp', + help="""Where to clean (/tmp)""") + + def main(self): + """clean up files left from testing in /tmp""" + # this is done using patterned names + for root, dirs, files in os.walk(self.options.base_dir): + for filename in files: + for pat in filepatterns: + if pat.match(filename): + fn = os.path.join(root, filename) + os.remove(fn) + break + for dirname in dirs: + for pat in dirpatterns: + if pat.match(dirname): + dn = os.path.join(root, dirname) + os.system('rm -rf %s' % dn) + +if __name__ == "__main__": + Cleaner("cleaner")() + + diff --git a/tests/test_xapianindex.py b/tests/test_xapianindex.py new file mode 100644 index 0000000..cf39f01 --- /dev/null +++ b/tests/test_xapianindex.py @@ -0,0 +1,91 @@ +from testutils import waitforindex + +from olpc.datastore.xapianindex import IndexManager +import os +from datetime import datetime + +import time +import unittest +import gnomevfs + +DEFAULT_STORE = '/tmp/_xi_test' + + +def index_file(iconn, filepath): + """Index a file.""" + + mimetype = gnomevfs.get_mime_type(filepath) + main, subtype = mimetype.split('/',1) + + stat = os.stat(filepath) + ctime = datetime.fromtimestamp(stat.st_ctime) + mtime = datetime.fromtimestamp(stat.st_mtime) + + if main in ['image']: filepath = None + if subtype in ['x-trash', 'x-python-bytecode']: filepath = None + + + + props = {'mimetype' : mimetype, 'mtime:date' : mtime, + 'ctime:date' : ctime,} + + if filepath: + fn = os.path.split(filepath)[1] + props['filename'] = fn + + iconn.index(props, filepath) + + return 1 + +def index_path(iconn, docpath): + """Index a path.""" + count = 0 + for dirpath, dirnames, filenames in os.walk(docpath): + for filename in filenames: + filepath = os.path.join(dirpath, filename) + index_file(iconn, filepath) + count += 1 + return count + +class Test(unittest.TestCase): + def setUp(self): + if os.path.exists(DEFAULT_STORE): + os.system("rm -rf %s" % DEFAULT_STORE) + + def tearDown(self): + if os.path.exists(DEFAULT_STORE): + os.system("rm -rf %s" % DEFAULT_STORE) + + def test_index(self): + # import a bunch of documents into the store + im = IndexManager() + im.connect(DEFAULT_STORE) + + # test basic index performance + start = time.time() + count = index_path(im, os.getcwd()) + end = time.time() + delta = end - start + + #print "%s in %s %s/sec" % (count, delta, count/delta) + + # wait for indexing to finish + waitforindex(im) + + # test basic search performance + results = list(im.search('peek')[0]) + + # this indicates that we found text inside binary content that + # we expected + assert 'test.pdf' in set(r.get_property('filename') for r in results) + + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(Test)) + return suite + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt new file mode 100644 index 0000000..de495a6 --- /dev/null +++ b/tests/xapianindex.txt @@ -0,0 +1,76 @@ +The xapian index module can be used directly as follows + +First clean up any old test data. + +>>> index_home = "/tmp/xi" +>>> import os, sys, time, logging +>>> assert os.system('rm -rf %s' % index_home) == 0 + +# >>> logging.basicConfig(level=logging.DEBUG, +# ... format="%(asctime)-15s %(name)s %(levelname)s: %(message)s", +# ... stream=sys.stderr) + + +>>> from olpc.datastore.xapianindex import IndexManager +>>> from olpc.datastore import model +>>> im = IndexManager() +>>> im.connect(index_home) + +A small utility method for wrapping a normal dict into proper property +objects. + +>>> def propsdict(**kwargs): +... d = {} +... for k,v in kwargs.iteritems(): +... d[k] = model.Property(k, v) +... return d + + +Now add the file to the index. + +>>> props = propsdict(title="PDF Document", +... mimetype="application/pdf") + + +>>> uid = im.index(props, "test.pdf") + +Let the async indexer do its thing. We ask the indexer if it has work +left, when it has none we expect our content to be indexed and searchable. + +>>> while im.working: time.sleep(0.5) + + +Searching on an property of the content works. +>>> assert im.search("PDF")[0] == uid + +Searching into the binary content of the object works as well. +>>> assert im.search("peek")[0] == uid + +Specifying a search that demands a document term be found only in the +title works as well. + +>>> assert im.search('title:PDF')[0] == uid +>>> im.search('title:peek') +[] + +Searching for documents that are PDF works as expected here. Here we +use the dictionary form of the query where each field name is given +and creates a search. +>>> assert im.search(dict(mimetype='application/pdf'))[0] == uid + + +#Likewise excluding the match works as expected +#>>> im.search('-title:PDF') +#[] + + +Punctuation is fine. + +>>> assert im.search("Don't peek")[0] == uid + +As well as quoted strings + +>>> assert im.search(r'''"Don't peek"''')[0] == uid + +Cleanly shut down. +>>> im.stop() |