Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-07-12 21:14:06 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-07-12 21:14:06 (GMT)
commitf577c2c142c7648a482e0eec7ecd736c1ca716d7 (patch)
tree259c5cf191116379e97d8aebc260f9664ad3a0e5
parentd7092a126f230f22344b50d79b8bd362d659953b (diff)
checkpoint new branch before the property type/xapian field merge
-rwxr-xr-xbin/index-service173
-rw-r--r--etc/org.laptop.sugar.Indexer.service.in3
-rw-r--r--src/olpc/datastore/query.py642
-rw-r--r--src/olpc/datastore/xapianindex.py271
-rwxr-xr-xtests/cleaner.py39
-rw-r--r--tests/test_xapianindex.py91
-rw-r--r--tests/xapianindex.txt76
7 files changed, 477 insertions, 818 deletions
diff --git a/bin/index-service b/bin/index-service
deleted file mode 100755
index a2ff83c..0000000
--- a/bin/index-service
+++ /dev/null
@@ -1,173 +0,0 @@
-#!/usr/bin/env python
-
-""" Async index service for the Datastore.
-
-Subscribes to the create/update/delete messages of the Datastore and
-performs the indexing. When this service is enabled the Datastore
-access the Xapian repository in read only mode.
-"""
-
-
-try: from ore.main import Application
-except ImportError: Application = object
-
-from olpc.datastore.datastore import DS_SERVICE, DS_OBJECT_PATH
-from olpc.datastore.datastore import DS_DBUS_INTERFACE
-from olpc.datastore.indexer import Indexer
-import dbus
-import dbus.mainloop.glib
-import logging
-import sys
-import os
-import signal
-
-profile = os.environ.get('SUGAR_PROFILE', 'default')
-base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
-repo_dir = os.path.join(base_dir, 'datastore')
-fulltext_dir = os.path.join(repo_dir, 'fulltext')
-
-log_dir = os.path.join(base_dir, "logs")
-if not os.path.exists(log_dir): os.makedirs(log_dir)
-
-os.chdir(repo_dir)
-
-# setup logger
-filename = None
-if not sys.stdin.isatty():
- filename = os.path.join(log_dir, "indexer.log")
-logging.basicConfig(level=logging.DEBUG,
- format="%(asctime)-15s %(levelname)s: %(message)s",
- filename = filename,
- )
-
-logger = logging.getLogger('org.laptop.sugar.Indexer')
-logger.setLevel(logging.DEBUG)
-
-class IndexService(Application):
- def manage_options(self):
- self.parser.add_option("--olpc.fulltext.repo",
- dest="fulltext_dir",
- action="store", default='fulltext',
- help="""Location of the FullText Repository""")
-
-
- def main(self):
- logging.debug('Starting the index service at %s' % self.options.fulltext_dir)
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SessionBus()
- self.fulltext = Indexer(self.options.fulltext_dir)
- self.fulltext.use_fulltext = True
-
- ds = bus.get_object(DS_SERVICE, DS_OBJECT_PATH)
- self.ds = dbus.Interface(ds, dbus_interface=DS_DBUS_INTERFACE)
-
- self.ds.connect_to_signal("Created", self.created,
- dbus_interface=DS_DBUS_INTERFACE)
-
- self.ds.connect_to_signal("Updated", self.updated,
- dbus_interface=DS_DBUS_INTERFACE)
-
- self.ds.connect_to_signal("Deleted", self.deleted,
- dbus_interface=DS_DBUS_INTERFACE)
-
-
- self.ds.connect_to_signal("Stopped", self.stopped,
- dbus_interface=DS_DBUS_INTERFACE)
-
- self.eventloop.run()
-
- def get_textprops(self, uid):
- # text properties also get full text indexing
- # currently this is still searched with the 'fulltext'
- # parameter of find()
- textprops = {}
- for k,v in self.ds.get_properties(uid, dict(type='text')).items():
- textprops[str(k)] = v and str(v) or ''
- return textprops
-
- def created(self, uid):
- """An object was created on the bus and we want to index it"""
- # because the file isn't encoded anywhere accessible in the
- # create call we must actually get the filename and trigger
- # the indexing on that
- filename = self.ds.get_filename(uid)
- r = None
- if filename:
- mime_type = self.ds.get_properties(uid, {}).get('mime_type', None)
- r = self.fulltext.fulltext_index(uid, filename, mime_type,
- self.get_textprops(uid))
- if r is True:
- logger.debug("index creation of %s" % uid)
- elif r is False:
- logger.debug("unable to index creation of %s" % uid)
- else:
- logger.debug("nothing to index on creation of %s" % uid)
-
- def updated(self, uid):
- """An object was updated on the bus and we want to index it"""
- # because the file isn't encoded anywhere accessible in the
- # create call we must actually get the filename and trigger
- # the indexing on that
- filename = self.ds.get_filename(uid)
- r = None
- if filename:
- mime_type = self.ds.get_properties(uid, {}).get('mime_type',
- None)
- r = self.fulltext.fulltext_index(uid, filename, mime_type,
- self.get_textprops(uid))
- if r is True:
- logger.debug("index update of %s" % uid)
- elif r is False:
- logger.debug("unable to index update of %s" % uid)
- else:
- logger.debug("nothing to index on update of %s" % uid)
-
-
- def deleted(self, uid):
- """An object was updated on the bus and we want to index it"""
- # because the file isn't encoded anywhere accessible in the
- # create call we must actually get the filename and trigger
- # the indexing on that
- try:
- self.fulltext.fulltext_unindex(uid)
- logger.debug("unindex deletion of %s" % uid);
- except KeyError: pass
-
-
- def stopped(self):
- """Respond to the datastore being stopped by shutting down
- ourselves"""
- self.fulltext.stop()
- self.eventloop.quit()
-
-
-if __name__ == "__main__":
- def handle_shutdown(signum, frame):
- idx.stopped()
- print "shutdown cleanly"
- raise SystemExit("Shutting down on signal %s" % signum)
-
- signal.signal(signal.SIGHUP, handle_shutdown)
- signal.signal(signal.SIGTERM, handle_shutdown)
-
- idx = IndexService()
- #idx()
- # w/o ore.main
-
- import gobject
- idx.eventloop = gobject.MainLoop()
- class options(object): pass
- o = options()
- o.fulltext_dir = 'fulltext'
- idx.options = o
- try:
- idx.main()
- except:
- # force logging this one
- logger.setLevel(logging.DEBUG)
- logger.debug("Problem in index service",
- exc_info=sys.exc_info())
- idx.stopped()
-
-
-
diff --git a/etc/org.laptop.sugar.Indexer.service.in b/etc/org.laptop.sugar.Indexer.service.in
deleted file mode 100644
index fb0a7ec..0000000
--- a/etc/org.laptop.sugar.Indexer.service.in
+++ /dev/null
@@ -1,3 +0,0 @@
-[D-BUS Service]
-Name = org.laptop.sugar.Indexer
-Exec = @bindir@/index-service
diff --git a/src/olpc/datastore/query.py b/src/olpc/datastore/query.py
deleted file mode 100644
index 2c5dd9f..0000000
--- a/src/olpc/datastore/query.py
+++ /dev/null
@@ -1,642 +0,0 @@
-"""
-olpc.datastore.query
-~~~~~~~~~~~~~~~~~~~~
-manage the metadata index and make it queryable. this in turn will
-depend on olpc.datastore.fulltext which indexes the actual content.
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-
-from datetime import datetime
-from lemur.xapian.sei import DocumentStore, DocumentPiece, SortableValue
-from olpc.datastore.converter import converter
-from olpc.datastore.model import DateProperty, TextProperty
-from olpc.datastore.model import Model, Content, Property, propertyByKind
-from olpc.datastore.utils import create_uid
-
-from sqlalchemy import create_engine, BoundMetaData
-from sqlalchemy import select, intersect, and_
-import atexit
-import logging
-import os, sys
-
-_marker = object()
-
-
-class SugarDomain(object):
- """The underlying property set used for metadata in the sugar
- system"""
- def kind_by_key(self, key):
- """resolves property names to the factory type that supports
- them in the model
- """
- # key may be a two part form directly indicating the property
- # type
- if ':' in key:
- key, kind = key.split(':', 1)
- # now resolve the kind to a property class
- return key, propertyByKind(kind)
-
- return key, {
- 'ctime' : DateProperty,
- 'mtime' : DateProperty,
- 'author' : Property,
- 'title' : TextProperty,
- 'mime_type' : Property,
- 'language' : Property,
- }.get(key, Property)
-
- def propertyFactory(self, key, value='', dict=None):
- key, kind = self.kind_by_key(key)
- p = kind(key, value)
- if dict is not None: dict[key] = p
- return kind
-
- def _automaticProperties(self):
- d = {}
- now = datetime.now()
- self.propertyFactory('mtime', now, dict=d)
- return d
-
- def _defaultProperties(self):
- d = {}
- now = datetime.now()
- self.propertyFactory('ctime', now, dict=d)
- self.propertyFactory('author', dict=d)
- self.propertyFactory('title', dict=d)
- self.propertyFactory('mime_type', dict=d)
- self.propertyFactory('language', dict=d)
-
- d.update(self._automaticProperties())
- return d
-
- def _normalizeProps(self, props, creating, include_defaults):
- # return a dict of {name : property}
- if isinstance(props, dict):
- # convert it into a dict of Property objects
- d = {}
- for k,v in props.iteritems():
- k, kind = self.kind_by_key(k)
- p = kind(k, v)
- d[k] = p
- if creating and include_defaults:
- defaults = self._defaultProperties()
- for k, v in defaults.iteritems():
- if k not in d: d[k] = v
- props = d
- else:
- d = {}
- for p in props:
- d[p.key] = p
- props = d
- return props
-
-
-
-class QueryManager(SugarDomain):
- FULLTEXT_NAME = "fulltext"
-
- def __init__(self, metadata_uri, **options):
- """
- The metadata_uri is a string used to find the database.
-
-
- This will check keywords for:
- 'language' Language is the language code used in the fulltext
- engine. This helps improve stemming and
- so on. In the future additional control
- will be provided.
-
- 'sync_index' which determines if we use an internal
- sync index impl or an out of process one
- via DBus. If the async process is to be
- used it must be properly configured and
- available for DBus to spawn.
-
- 'fulltext_repo' the full filepath to which the fulltext
- index data will be stored
-
- 'use_fulltext' when true indexing will be performed
-
- """
- self.uri = metadata_uri
- self.options = options
-
- self.backingstore = None
- self.content_ext = None
-
-
- def _handle_option(self, options, key, default=_marker):
- value = options.get(key, default)
- if value is _marker: raise KeyError(key)
- setattr(self, key, value)
-
- def _handle_options(self, **kwargs):
- self._handle_option(kwargs, 'fulltext_repo')
- self._handle_option(kwargs, 'use_fulltext', True)
- self._handle_option(kwargs, 'sync_index', True)
- self._handle_option(kwargs, 'language', 'en')
- self.sync_index = self.use_fulltext and self.sync_index
-
- def bind_to(self, backingstore):
- self.backingstore = backingstore
-
- def prepare_index(self):
- self.connect_db()
- self.prepare_db()
- self.connect_model()
-
- def prepare_fulltext(self):
- self.connect_fulltext(self.fulltext_repo, self.language,
- read_only=not self.sync_index)
-
- def prepare(self):
- """This is called by the datastore with its backingstore and
- querymanager. Its assumed that querymanager is None and we are
- the first in this release
- """
- self._handle_options(**self.options)
- self.prepare_index()
- self.prepare_fulltext()
- return True
-
- def stop(self):
- pass
-
- # Primary interface
- def create(self, props, filelike=None, include_defaults=True):
- """Props can either be a dict of k,v pairs or a sequence of
- Property objects.
-
- The advantage of using property objects is that the data can
- by typed. When k/v pairs are used a default string type will
- be chosen.
-
- When include_defaults is True a default set of properties are
- created on behalf of the Content if they were not provided.
-
- These include:
- author : ''
- title : ''
- mime_type : ''
- language : '',
- ctime : '',
- mtime : '',
- """
- s = self.model.session
- c = Content()
- # its important the id be set before other operations
- c.id = create_uid()
- s.save(c)
-
- self._bindProperties(c, props, creating=True, include_defaults=include_defaults)
- s.flush()
- c.backingstore = self.backingstore
-
- if self.sync_index and filelike:
- self.fulltext_index(c.id, filelike,
- mimetype=c.get_property('mime_type'),
- textprops=self.get_textprops(c))
-
- return c
-
- def update(self, content_or_uid, props=None, filelike=None):
- content = self._resolve(content_or_uid)
- content.backingstore = self.backingstore
- if props is not None:
- self._bindProperties(content, props, creating=False)
- self.model.session.flush()
-
- if self.sync_index and filelike:
- self.fulltext_index(content.id, filelike, textprops=self.get_textprops(content))
-
-
- def _bindProperties(self, content, props, creating=False, include_defaults=False):
- """Handle either a dict of properties or a list of property
- objects, binding them to the content instance.
- """
- # for information on include_defaults see create()
- # default properties are only provided when creating is True
- session = self.model.session
-
- props = self._normalizeProps(props, creating,
- include_defaults)
-
- # we should have a dict of property objects
- if creating:
- content.properties.extend(props.values())
- else:
- # if the automatically maintained properties (like mtime)
- # are not set, include them now
- auto = self._automaticProperties()
- auto.update(props)
- props = auto
- # we have to check for the update case
- oldProps = dict([(p.key, p) for p in content.properties])
- for k, p in props.iteritems():
- if k in oldProps:
- oldProps[k].value = p.value
- oldProps[k].type = p.type
- else:
- content.properties.append(p)
-
- def get(self, uid):
- return self.model.session.query(self.model.mappers['content']).get(uid)
-
- def get_properties(self, content_or_uid, keys):
- c = self._resolve(content_or_uid)
- return self.model.session.query(Property).select_by(self.model.property.c.key.in_(keys),
- content_id=c.id)
-
-
- def get_uniquevaluesfor(self, propertyname):
- properties = self.model.tables['properties']
- return [r[0] for r in select([properties.c.value],
- properties.c.key==propertyname,
- distinct=True).execute().fetchall()]
-
-
-
- def delete(self, content_or_uid):
- c = self._resolve(content_or_uid)
- s = self.model.session
- s.delete(c)
- s.flush()
- if self.sync_index:
- self.fulltext_unindex(c.id)
-
-
- def find(self, query=None, **kwargs):
- """
- dates can be search in one of two ways.
- date='YYYY-MM-DD HH:MM:SS'
- date={'start' : 'YYYY-MM-DD HH:MM:SS',
- 'end' : 'YYYY-MM-DD HH:MM:SS'
- }
- where date is either ctime or mtime.
- if start or end is omitted its becomes a simple before/after
- style query. If both are provided its a between query.
-
- providing the key 'fulltext' will include a full text search
- of content matching its parameters. see fulltext_search for
- additional details.
-
-
- If 'limit' is passed it will be the maximum number of results
- to return and 'offset' will be the offset from 0 into the
- result set to return.
-
- """
-
- # XXX: this will have to be expanded, but in its simplest form
- if not self.sync_index: self.index.reopen()
-
- s = self.model.session
- properties = self.model.tables['properties']
- if not query: query = {}
- query.update(kwargs)
- q = s.query(Content)
- # rewrite the query to reference properties
- # XXX: if there is a 'content' key will will have to search
- # the content using the full text index which will result in a
- # list of id's which must be mapped into the query
- # fulltext_threshold is the minimum acceptable relevance score
- limit = query.pop('limit', None)
- offset = query.pop('offset', None)
-
- if offset: q = q.offset(offset)
- if limit: q = q.limit(limit)
-
- if query:
- where = []
- fulltext = query.pop('fulltext', None)
- threshold = query.pop('fulltext_threshold', 60)
-
-
-
- statement = None
- ft_select = None
-
- if query:
- # daterange support
- # XXX: this is sort of a hack because
- # - it relies on Manifest typing in sqlite
- # - value's type is not normalized
- # - we make special exception based on property name
- # if we need special db handling of dates ctime/mtime
- # will become columns of Content and not properties
- ctime = query.pop('ctime', None)
- mtime = query.pop('mtime', None)
- if ctime or mtime:
- self._query_dates(ctime, mtime, where)
- for k,v in query.iteritems():
- if isinstance(v, list):
- v = properties.c.value.in_(*v)
- else:
- v = properties.c.value==v
-
- where.append(select([properties.c.content_id],
- and_( properties.c.key==k,
- v)))
-
- statement = intersect(*where)
- statement.distinct=True
-
- if fulltext and self.use_fulltext:
- # perform the full text search and map the id's into
- # the statement for inclusion
- ft_res = self.fulltext_search(fulltext)
- if ft_res:
- ft_ids = [ft[0] for ft in ft_res if ft[1] >=
- threshold]
-
- if ft_ids:
- ft_select = select([properties.c.content_id],
- properties.c.content_id.in_(*ft_ids))
-
- if ft_select is None:
- # the full text query eliminated the possibility
- # of results by returning nothing under a logical
- # AND condition, bail now
- return ([], 0)
- else:
- if statement is None:
- statement = ft_select
- statement.distinct = True
- else:
- statement = intersect(statement, ft_select)
-
- result = statement.execute()
- r = [q.get(i[0]) for i in result]
- r = (r, len(r))
- else:
- r = (q.select(), q.count())
-
- # XXX: make sure the proper backingstore is mapped
- # this currently forbids the use case of keeping index data
- # for a read-only store.
- for item in r[0]:
- item.backingstore = self.backingstore
-
- return r
-
- # sqla util
- def _resolve(self, content_or_uid):
- if isinstance(content_or_uid, basestring):
- # we need to resolve the object
- content_or_uid = self.model.session.query(Content).get(content_or_uid)
- return content_or_uid
-
- def _query_dates(self, ctime, mtime, selects):
- if ctime: selects.append(self._query_date('ctime', ctime))
- if mtime: selects.append(self._query_date('mtime', mtime))
-
- def _query_date(self, key, date):
- properties = self.model.properties
-
- if isinstance(date, basestring):
- s = select([properties.c.content_id],
- and_( properties.c.key==key,
- properties.c.value==date))
- else:
- # its a dict with start/end
- start = date.get('start')
- end = date.get('end')
- if start and end:
- s = select([properties.c.content_id],
- and_( properties.c.key==key,
- properties.c.value.between(start,
- end)))
- elif start:
- s = select([properties.c.content_id],
- and_( properties.c.key==key,
- properties.c.value >=start))
- else:
- s = select([properties.c.content_id],
- and_( properties.c.key==key,
- properties.c.value < end))
-
- return s
-
-
- def get_textprops(self, uid_or_content):
- # text properties also get full text indexing
- # currently this is still searched with the 'fulltext'
- # parameter of find()
- content = self._resolve(uid_or_content)
- textprops = {}
- for p in content.get_properties(type='text'):
- textprops[p.key] = p.value and p.value or ''
- return textprops
-
-
- # fulltext interface
- def fulltext_index(self, uid, fileobj, mimetype=None, textprops=None):
- """Index the fileobj relative to uid which should be a
- olpc.datastore.model.Content object's uid. The fileobj can be
- either a pathname or an object implementing the Python file
- ('read') interface.
- """
- pass
-
- def fulltext_unindex(self, content_id):
- pass
-
- def fulltext_search(self, *args, **kwargs):
- return []
-
- # lifecycle
- def connect_db(self):
- """Connect to the underlying database. Called implicitly by
- __init__"""
- pass
-
-
- def prepare_db(self):
- """After connecting to the metadata database take any
- initialization steps needed for the environment.
-
- This is called implicitly by __init__ before the model is
- brought online.
- """
- pass
-
- def connect_model(self, model):
- """Connect the model. Called with the model passed into
- __init__ after the database has been prepared.
- """
- pass
-
- def connect_fulltext(self, repo, language, read_only):
- """Connect the full text index"""
- pass
-
-
-class SQLiteQueryManager(QueryManager):
- """The default implementation of the query manager. This owns the
- model object and the fulltext object
- """
-
- def __init__(self, uri, **kwargs):
- super(SQLiteQueryManager, self).__init__(uri, **kwargs)
- # now re-write the URI to be sqlite specific
- # (we were initialized to a namepattern in the proper
- # directory by the backingstore)
- self.uri= "sqlite:///%s.db" % self.uri
-
- def connect_db(self):
- self.db = create_engine(self.uri)
- self.metadata = BoundMetaData(self.db)
-
- def prepare_db(self):
- # Using the sqlite backend we can tune the performance to
- # limit writes as much as possible
- if self.db.name.startswith('sqlite'):
- connection = self.db.connect()
- # cut down of per-activity file locking writes
- connection.execute("PRAGMA locking_mode=EXCLUSIVE")
- # don't demand fsync -- if this is too dangerous
- # we can change it to normal which is still less writey
- # than the default FULL
- connection.execute("PRAGMA synchronous=OFF")
- # temporary tables and indices are kept in memory
- connection.execute("PRAGMA temp_store=MEMORY")
- # XXX: what is the ideal jffs2 page size
- # connection.execute("PRAGMA page_size 4096")
-
- def connect_model(self, model=None):
- if model is None: model = Model()
- # take the model and connect it to us
- model.prepare(self)
-
- # make sure all the tables and indexes exist
- self.metadata.create_all()
-
- self.model = model
-
-
- def stop(self):
- # clean up
- self.db.dispose()
-
-# Full text support
-def flatten_unicode(value): return value.encode('utf-8')
-
-class XapianBinaryValue(SortableValue):
- def __init__(self, value, field_name="content"):
- SortableValue.__init__(self, value, field_name)
-
-class XapianFulltext(object):
- def connect_fulltext(self, repo, language='en', read_only=True):
- if not os.path.exists(repo) and read_only is True:
- # create the store
- index = DocumentStore(repo, language, read_only=False)
- index.close()
- # and abandon it
- self.index = DocumentStore(repo, language, read_only=read_only)
- self.index.registerFlattener(unicode, flatten_unicode)
- atexit.register(self.index.close)
-
- def fulltext_index(self, uid, fileobj, mimetype=None, textprops=None):
- """Index the fileobj relative to uid which should be a
- olpc.datastore.model.Content's uid. The fileobj can be either
- a pathname or an object implementing the Python file ('read')
- interface.
- """
- piece = DocumentPiece
- if isinstance(fileobj, basestring):
- # treat it as a pathname
- # use the global converter to try to get text from the
- # file
- fp = converter(fileobj, mimetype=mimetype)
- #piece = XapianBinaryValue
- elif hasattr(fileobj, 'read'):
- # this is an off case, we have to assume utf-8 data
- logging.debug("Indexing from readable, not filename")
- fp = fileobj
- else:
- raise ValueError("Not a valid file object")
-
- if fp is None:
- # for whatever reason we were unable to get the content
- # into an indexable form.
- logging.debug("Unable to index %s %s" % (uid, fileobj))
- return False
- return self._ft_index(uid, fp, piece, textprops)
-
- def _ft_index(self, content_id, fp, piece=DocumentPiece, fields=None):
- try:
- doc = [piece(fp.read())]
- if fields:
- # add in properties that need extra fulltext like
- # management
- for key, value in fields.iteritems():
- doc.append(DocumentPiece(value, key))
-
- self.index.addDocument(doc, content_id)
- self.index.flush()
- return True
- except:
- logging.debug("fulltext index exception", exc_info=sys.exc_info())
- return False
-
-
-
- def fulltext_search(self, *args, **kwargs):
- """
- perform search(search_string, ) -> [(content_id, relevance), ...]
-
- search_string is a string defining the serach in standard web search
- syntax.
-
- ie: it contains a set of search terms. Each search term may be
- preceded by a "+" sign to indicate that the term is required, or a "-"
- to indicate that is is required to be absent.
-
- If field_name is not None, it is the prefix of a field, which the
- search will be restricted to.
-
- If field_name is None, the search will search all fields by default,
- but search terms may be preceded by a fieldname followed by a colon to
- restrict part of the search to a given field.
-
- combiner is one of DocumentStore.OP_OR or DocumentStore.OP_AND, and is
- used to indicate the default operator used to combine terms.
-
- partial is a flag, which should be set to True to enable partial search
- matching, for use when doing interactive searches and we're not sure if
- the user has finished typing the search yet.
-
- range_restrictions is a RangeRestrictions object, used to restrict the
- search results.
-
- """
- if len(args) == 1:
- # workaround for api change
- args = (args[0], 0, 10)
-
- res = self.index.performSearch(*args, **kwargs)
- est = max(1, res.estimatedResultCount())
- return res.getResults(0, est)
-
- def fulltext_similar(self, *content_ids):
- return self.index.findSimilar(content_ids)
-
- def fulltext_unindex(self, content_id):
- self.index.deleteDocument(content_id)
-
- def stop(self):
- if self.use_fulltext:
- self.index.close()
-
-
-class DefaultQueryManager(XapianFulltext, SQLiteQueryManager):
-
- def stop(self):
- XapianFulltext.stop(self)
- SQLiteQueryManager.stop(self)
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
new file mode 100644
index 0000000..5772433
--- /dev/null
+++ b/src/olpc/datastore/xapianindex.py
@@ -0,0 +1,271 @@
+"""
+xapianindex
+~~~~~~~~~~~~~~~~~~~~
+maintain indexes on content
+
+"""
+
+__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
+__docformat__ = 'restructuredtext'
+__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
+__license__ = 'The GNU Public License V2+'
+
+
+from Queue import Queue, Empty
+import logging
+import re
+
+import threading
+import warnings
+
+import secore
+
+from olpc.datastore import model
+from olpc.datastore.converter import converter
+from olpc.datastore.utils import create_uid
+
+
+# Setup Logger
+logger = logging.getLogger('org.sugar.datastore.xapianindex')
+
+
+class IndexManager(object):
+
+ def __init__(self, language='en'):
+ # We will maintain two connections to the database
+ # we trigger automatic flushes to the read_index
+ # after any write operation
+ self.write_index = None
+ self.read_index = None
+ self.queue = Queue(0)
+ self.indexer_running = False
+ self.language = language
+
+ self.fields = set()
+
+ #
+ # Initialization
+ def connect(self, repo):
+ if self.write_index is not None:
+ warnings.warn('''Requested redundant connect''', RuntimeWarning)
+
+ self.write_index = secore.IndexerConnection(repo)
+ self.setupFields()
+
+ self.read_index = secore.SearchConnection(repo)
+
+ # by default we start the indexer now
+ self.startIndexer()
+
+ def stop(self):
+ self.stopIndexer()
+ self.write_index.close()
+ self.read_index.close()
+
+
+ def startIndexer(self):
+ self.indexer_running = True
+ self.indexer = threading.Thread(target=self.indexThread,
+ name="XapianIndexer")
+ self.indexer.setDaemon(True)
+ self.indexer.start()
+
+ def stopIndexer(self, force=False):
+ if not self.indexer_running: return
+ if not force: self.queue.join()
+ self.indexer_running = False
+ self.indexer.join()
+
+ def enque(self, uid, vid, doc):
+ self.queue.put((uid, vid, doc))
+
+ def indexThread(self):
+ # process the queue
+ while self.indexer_running:
+ # include timeout here to ease shutdown of the thread
+ # if this is a non-issue we can simply allow it to block
+ try:
+ uid, vid, doc = self.queue.get(timeout=0.5)
+ self.write_index.add(doc)
+ self.flush()
+ logger.info("Indexed Content %s:%s" % (uid, vid))
+ self.queue.task_done()
+ except Empty:
+ pass
+
+ @property
+ def working(self):
+ """Does the indexer have work"""
+ return not self.queue.empty()
+
+ def flush(self):
+ """Called after any database mutation"""
+ self.write_index.flush()
+ self.read_index.reopen()
+
+ #
+ # Field management
+ def addField(self, key, store=True, exact=False, sortable=False,
+ type='string', collapse=False,
+ **kwargs):
+ language = kwargs.pop('language', self.language)
+
+ xi = self.write_index.add_field_action
+
+ if store: xi(key, secore.FieldActions.STORE_CONTENT)
+ if exact: xi(key, secore.FieldActions.INDEX_EXACT)
+ else:
+ # weight -- int 1 or more
+ # nopos -- don't include positional information
+ # noprefix -- boolean
+ xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs)
+
+ if sortable:
+ xi(key, secore.FieldActions.SORTABLE, type=type)
+ if collapse:
+ xi(key, secore.FieldActions.COLLAPSE)
+
+ # track this to find missing field configurations
+ self.fields.add(key)
+
+ def setupFields(self):
+ # add standard fields
+ # text is content objects information
+ self.addField('text', store=False, exact=False)
+
+ # vid is version id
+ self.addField('vid', store=True, exact=True, sortable=True, type="float")
+
+ # Title has additional weight
+ self.addField('title', store=True, exact=False, weight=2, sortable=True)
+
+ self.addField('mimetype', store=True, exact=True)
+ self.addField('author', store=True, exact=True)
+ self.addField('language', store=True, exact=True)
+
+
+ self.addField('ctime', store=True, exact=True, sortable=True, type='date')
+ self.addField('mtime', store=True, exact=True, sortable=True, type='date')
+
+ #
+ # Index Functions
+ def index(self, props, filename=None):
+ """Index the content of an object.
+ Props must contain the following:
+ key -> Property()
+ """
+ doc = secore.UnprocessedDocument()
+ add = doc.fields.append
+
+ if filename:
+ mimetype = props.get("mimetype")
+ mimetype = mimetype and mimetype.value or 'text/plain'
+ fp = converter(filename, mimetype)
+
+ #
+ # File contents
+ if fp:
+ # add the (converted) document contents
+ add(secore.Field('text', fp.read()))
+
+ #
+ # Version handling
+ #
+ # we implicitly create new versions of documents the version
+ # id should have been set by the higher level system
+ uid = props.pop('uid', None)
+ vid = props.pop('vid', None)
+
+ if uid: uid = uid.value
+ else: uid = create_uid()
+ if vid: vid = vid.value
+ else: vid = "1.0"
+
+ doc.id = uid
+ add(secore.Field('vid', vid))
+
+ #
+ # Property indexing
+ for k, prop in props.iteritems():
+ if isinstance(prop, model.BinaryProperty): continue
+ value = prop.value
+ if k not in self.fields:
+ warnings.warn("""Missing field configuration for %s""" % k,
+ RuntimeWarning)
+ continue
+ add(secore.Field(k, value))
+
+ # queue the document for processing
+ self.enque(uid, vid, doc)
+
+ return uid
+
+ #
+ # Search
+ def search(self, query, start_index=0, end_index=50):
+ """search the xapian store.
+ query is a string defining the serach in standard web search syntax.
+
+ ie: it contains a set of search terms. Each search term may be
+ preceded by a "+" sign to indicate that the term is required, or a "-"
+ to indicate that is is required to be absent.
+ """
+ # this will return the [(id, relevance), ...], estimated
+ # result count
+ ri = self.read_index
+ if isinstance(query, dict):
+ queries = []
+ # each term becomes part of the query join
+ for k, v in query.iteritems():
+ queries.append(ri.query_field(k, v))
+ q = ri.query_composite(ri.OP_AND, queries)
+ else:
+ q = self.parse_query(query)
+
+
+ results = ri.search(q, start_index, end_index)
+ return [r.id for r in results]
+
+ def parse_query(self, query):
+ # accept standard web query like syntax
+ # 'this' -- match this
+ # 'this that' -- match this and that in document
+ # '"this that"' match the exact pharse 'this that'
+ # 'title:foo' match a document whose title contains 'foo'
+ # 'title:"A tale of two datastores"' exact title match
+ # '-this that' match that w/o this
+ ri = self.read_index
+ start = 0
+ end = len(query)
+ nextword = re.compile("(\S+)")
+ endquote = re.compile('(")')
+ queries = []
+ while start < end:
+ m = nextword.match(query, start)
+ if not m: break
+ orig = start
+ field = None
+ start = m.end() + 1
+ word = m.group(1)
+ if ':' in word:
+ # see if its a field match
+ fieldname, w = word.split(':', 1)
+ if fieldname in self.fields:
+ field = fieldname
+
+ word = w
+
+ if word.startswith('"'):
+ qm = endquote.search(query, start)
+ if qm:
+ #XXX: strip quotes or not here
+ #word = query[orig+1:qm.end(1)-1]
+ word = query[orig:qm.end(1)]
+ start = qm.end(1) + 1
+
+ if field:
+ queries.append(ri.query_field(field, word))
+ else:
+ queries.append(ri.query_parse(word))
+ q = ri.query_composite(ri.OP_AND, queries)
+ return q
diff --git a/tests/cleaner.py b/tests/cleaner.py
new file mode 100755
index 0000000..8cc795b
--- /dev/null
+++ b/tests/cleaner.py
@@ -0,0 +1,39 @@
+#!/usr/bin/python
+import os
+import re
+from ore.main import Application
+
+filepattern = re.compile("(\w{8})\-(\w{4})\-(\w{4})\-(\w{4})\-(\w{12})")
+tmppattern = re.compile("tmp\S{6}")
+
+staticdirs = re.compile('test_ds|store\d')
+
+filepatterns = [filepattern, tmppattern]
+dirpatterns = [staticdirs]
+
+class Cleaner(Application):
+ def manage_options(self):
+ self.parser.add_option("--base", dest="base_dir",
+ action="store", default='/tmp',
+ help="""Where to clean (/tmp)""")
+
+ def main(self):
+ """clean up files left from testing in /tmp"""
+ # this is done using patterned names
+ for root, dirs, files in os.walk(self.options.base_dir):
+ for filename in files:
+ for pat in filepatterns:
+ if pat.match(filename):
+ fn = os.path.join(root, filename)
+ os.remove(fn)
+ break
+ for dirname in dirs:
+ for pat in dirpatterns:
+ if pat.match(dirname):
+ dn = os.path.join(root, dirname)
+ os.system('rm -rf %s' % dn)
+
+if __name__ == "__main__":
+ Cleaner("cleaner")()
+
+
diff --git a/tests/test_xapianindex.py b/tests/test_xapianindex.py
new file mode 100644
index 0000000..cf39f01
--- /dev/null
+++ b/tests/test_xapianindex.py
@@ -0,0 +1,91 @@
+from testutils import waitforindex
+
+from olpc.datastore.xapianindex import IndexManager
+import os
+from datetime import datetime
+
+import time
+import unittest
+import gnomevfs
+
+DEFAULT_STORE = '/tmp/_xi_test'
+
+
+def index_file(iconn, filepath):
+ """Index a file."""
+
+ mimetype = gnomevfs.get_mime_type(filepath)
+ main, subtype = mimetype.split('/',1)
+
+ stat = os.stat(filepath)
+ ctime = datetime.fromtimestamp(stat.st_ctime)
+ mtime = datetime.fromtimestamp(stat.st_mtime)
+
+ if main in ['image']: filepath = None
+ if subtype in ['x-trash', 'x-python-bytecode']: filepath = None
+
+
+
+ props = {'mimetype' : mimetype, 'mtime:date' : mtime,
+ 'ctime:date' : ctime,}
+
+ if filepath:
+ fn = os.path.split(filepath)[1]
+ props['filename'] = fn
+
+ iconn.index(props, filepath)
+
+ return 1
+
+def index_path(iconn, docpath):
+ """Index a path."""
+ count = 0
+ for dirpath, dirnames, filenames in os.walk(docpath):
+ for filename in filenames:
+ filepath = os.path.join(dirpath, filename)
+ index_file(iconn, filepath)
+ count += 1
+ return count
+
+class Test(unittest.TestCase):
+ def setUp(self):
+ if os.path.exists(DEFAULT_STORE):
+ os.system("rm -rf %s" % DEFAULT_STORE)
+
+ def tearDown(self):
+ if os.path.exists(DEFAULT_STORE):
+ os.system("rm -rf %s" % DEFAULT_STORE)
+
+ def test_index(self):
+ # import a bunch of documents into the store
+ im = IndexManager()
+ im.connect(DEFAULT_STORE)
+
+ # test basic index performance
+ start = time.time()
+ count = index_path(im, os.getcwd())
+ end = time.time()
+ delta = end - start
+
+ #print "%s in %s %s/sec" % (count, delta, count/delta)
+
+ # wait for indexing to finish
+ waitforindex(im)
+
+ # test basic search performance
+ results = list(im.search('peek')[0])
+
+ # this indicates that we found text inside binary content that
+ # we expected
+ assert 'test.pdf' in set(r.get_property('filename') for r in results)
+
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(Test))
+ return suite
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt
new file mode 100644
index 0000000..de495a6
--- /dev/null
+++ b/tests/xapianindex.txt
@@ -0,0 +1,76 @@
+The xapian index module can be used directly as follows
+
+First clean up any old test data.
+
+>>> index_home = "/tmp/xi"
+>>> import os, sys, time, logging
+>>> assert os.system('rm -rf %s' % index_home) == 0
+
+# >>> logging.basicConfig(level=logging.DEBUG,
+# ... format="%(asctime)-15s %(name)s %(levelname)s: %(message)s",
+# ... stream=sys.stderr)
+
+
+>>> from olpc.datastore.xapianindex import IndexManager
+>>> from olpc.datastore import model
+>>> im = IndexManager()
+>>> im.connect(index_home)
+
+A small utility method for wrapping a normal dict into proper property
+objects.
+
+>>> def propsdict(**kwargs):
+... d = {}
+... for k,v in kwargs.iteritems():
+... d[k] = model.Property(k, v)
+... return d
+
+
+Now add the file to the index.
+
+>>> props = propsdict(title="PDF Document",
+... mimetype="application/pdf")
+
+
+>>> uid = im.index(props, "test.pdf")
+
+Let the async indexer do its thing. We ask the indexer if it has work
+left, when it has none we expect our content to be indexed and searchable.
+
+>>> while im.working: time.sleep(0.5)
+
+
+Searching on an property of the content works.
+>>> assert im.search("PDF")[0] == uid
+
+Searching into the binary content of the object works as well.
+>>> assert im.search("peek")[0] == uid
+
+Specifying a search that demands a document term be found only in the
+title works as well.
+
+>>> assert im.search('title:PDF')[0] == uid
+>>> im.search('title:peek')
+[]
+
+Searching for documents that are PDF works as expected here. Here we
+use the dictionary form of the query where each field name is given
+and creates a search.
+>>> assert im.search(dict(mimetype='application/pdf'))[0] == uid
+
+
+#Likewise excluding the match works as expected
+#>>> im.search('-title:PDF')
+#[]
+
+
+Punctuation is fine.
+
+>>> assert im.search("Don't peek")[0] == uid
+
+As well as quoted strings
+
+>>> assert im.search(r'''"Don't peek"''')[0] == uid
+
+Cleanly shut down.
+>>> im.stop()