diff options
Diffstat (limited to 'src/olpc/datastore/xapianindex.py')
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 198 |
1 files changed, 152 insertions, 46 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index d46e0c7..5f29feb 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -15,6 +15,7 @@ __license__ = 'The GNU Public License V2+' from Queue import Queue, Empty import logging +import os import re import sys import time @@ -23,6 +24,7 @@ import threading import warnings import secore +import xapian as _xapian # we need to modify the QueryParser from olpc.datastore import model from olpc.datastore.converter import converter @@ -127,7 +129,12 @@ class IndexManager(object): self.write_index.flush() self.read_index.reopen() - + def enqueSequence(self, commands): + """Takes a sequence of arugments to the normal enque function + and executes them under a single lock/flush cycle + """ + self.queue.put(commands) + def enque(self, uid, vid, doc, operation, filestuff=None): # here we implement the sync/async policy # we want to take create/update operations and @@ -162,52 +169,76 @@ class IndexManager(object): # from the item in the repo as that will become our immutable # copy. Detect those cases and use the internal filename # property or backingstore._translatePath to get at it + versions = self.versions + inplace = self.inplace + while self.indexer_running: # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: data = self.queue.get(True, 0.025) - uid, vid, doc, operation, filestuff = data + # when we enque a sequence of commands they happen + # under a single write lock pass through the loop and + # the changes become visible at once. + + if not isinstance(data[0], (list, tuple)): + data = (data,) except Empty: - #time.sleep(1.0) continue try: with self._write_lock: - if operation is DELETE: - self.write_index.delete(uid) - logger.info("deleted content %s" % (uid,)) - elif operation is UPDATE: - # Here we handle the conversion of binary - # documents to plain text for indexing. This is - # done in the thread to keep things async and - # latency lower. - # we know that there is filestuff or it - # wouldn't have been queued - filename, mimetype = filestuff - if isinstance(filename, file): - filename = filename.name - fp = converter(filename, mimetype) - if fp: - # read in at a fixed block size, try to - # conserve memory. If this doesn't work - # we can make doc.fields a generator - while True: - chunk = fp.read(2048) - if not chunk: break - doc.fields.append(secore.Field('fulltext', chunk)) + for item in data: + uid, vid, doc, operation, filestuff = item + + if operation is DELETE: + self.write_index.delete(uid) + logger.info("deleted content %s" % (uid,)) + elif operation is UPDATE: + # Here we handle the conversion of binary + # documents to plain text for indexing. This is + # done in the thread to keep things async and + # latency lower. + # we know that there is filestuff or it + # wouldn't have been queued + filename, mimetype = filestuff + if isinstance(filename, file): + filename = filename.name + if filename and not os.path.exists(filename): + # someone removed the file before + # indexing + # or the path is broken + logger.warning("Expected file for" + " indexing at %s. Not" + " Found" % filename) - self.write_index.replace(doc) - logger.info("update file content %s:%s" % (uid, vid)) + fp = converter(filename, mimetype) + if fp: + # read in at a fixed block size, try to + # conserve memory. If this doesn't work + # we can make doc.fields a generator + while True: + chunk = fp.read(2048) + if not chunk: break + doc.fields.append(secore.Field('fulltext', chunk)) + + self.write_index.replace(doc) + if versions and not inplace: + # we know the source file is ours + # to remove + os.unlink(filename) + + logger.info("update file content %s:%s" % (uid, vid)) + else: + logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) else: - logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) - else: - logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) - + logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) + # tell the queue its complete self.queue.task_done() - # we do flush on each record now + # we do flush on each record (or set for enque + # sequences) now self.flush() except: logger.exception("Error in indexer") @@ -267,34 +298,63 @@ class IndexManager(object): return d + @property + def versions(self): + if self.backingstore: + return "versions" in self.backingstore.capabilities + return False + + @property + def inplace(self): + if self.backingstore: + return "inplace" in self.backingstore.capabilities + return False + + def index(self, props, filename=None): """Index the content of an object. Props must contain the following: key -> Property() """ operation = UPDATE + # # Version handling # - # we implicitly create new versions of documents the version - # id should have been set by the higher level system + # are we doing any special handling for versions? uid = props.pop('uid', None) - vid = props.pop('vid', None) + if not uid: uid = create_uid() operation = CREATE - if vid: vid = str(float(vid) + 1.0) - else: vid = "1.0" # Property mapping via model - props = self._mapProperties(props) doc = secore.UnprocessedDocument() add = doc.fields.append - fp = None + + vid = None + if self.versions: + vid = props.get("vid") + if not vid: + self.warn("Didn't supply needed versioning information" + " on a backingstore which performs versioning") + # each versions id is unique when using a versioning store + doc.id = create_uid() + else: + doc.id = uid + + if not vid: vid = '1' + # on non-versioning stores this is redundant but on versioning + # stores it reference to the objects whole timeline + props['uid'] = uid + props['vid'] = vid + + props = self._mapProperties(props) + filestuff = None if filename: # enque async file processing @@ -302,11 +362,11 @@ class IndexManager(object): # and open fp? mimetype = props.get("mime_type") mimetype = mimetype and mimetype.value or 'text/plain' + + filename = os.path.abspath(filename) filestuff = (filename, mimetype) - doc.id = uid - add(secore.Field('vid', vid)) - + # # Property indexing for k, prop in props.iteritems(): @@ -322,13 +382,26 @@ class IndexManager(object): # queue the document for processing self.enque(uid, vid, doc, operation, filestuff) - return uid + return doc.id def get(self, uid): doc = self.read_index.get_document(uid) if not doc: raise KeyError(uid) return model.Content(doc, self.backingstore, self.datamodel) + def get_by_uid_prop(self, uid, rev=None): + # versioning stores fetch objects by uid + # when rev is passed only that particular rev is returne + ri = self.read_index + q = ri.query_field('uid', uid) + if rev: + q = ri.query_filter(q, ri.query_field('vid', str(rev))) + results, count = self._search(q, 0, 1000) + + return results, count + + + def delete(self, uid): # does this need queuing? # the higher level abstractions have to handle interaction @@ -381,8 +454,11 @@ class IndexManager(object): q = ri.query_composite(ri.OP_AND, queries) else: q = self.parse_query(query) - - results = ri.search(q, start_index, end_index) + + return self._search(q, start_index, end_index) + + def _search(self, q, start_index, end_index): + results = self.read_index.search(q, start_index, end_index) count = results.matches_estimated # map the result set to model.Content items @@ -423,6 +499,12 @@ class IndexManager(object): # 'title:foo' match a document whose title contains 'foo' # 'title:"A tale of two datastores"' exact title match # '-this that' match that w/o this + + # limited support for wildcard searches + qp = _xapian.QueryParser + + flags = (qp.FLAG_LOVEHATE) + ri = self.read_index start = 0 end = len(query) @@ -450,11 +532,35 @@ class IndexManager(object): #XXX: strip quotes or not here #word = query[orig+1:qm.end(1)-1] word = query[orig:qm.end(1)] + # this is a phase modify the flags + flags |= qp.FLAG_PHRASE start = qm.end(1) + 1 if field: queries.append(ri.query_field(field, word)) else: - queries.append(ri.query_parse(word)) + if word.endswith("*"): + flags |= qp.FLAG_WILDCARD + q = self._query_parse(word, flags) + + queries.append(q) q = ri.query_composite(ri.OP_AND, queries) return q + + + + def _query_parse(self, word, flags=0, op=None): + # while newer secore do pass flags it doesn't allow control + # over them at the API level. We override here to support + # adding wildcard searching + ri = self.read_index + if op is None: op = ri.OP_AND + qp = ri._prepare_queryparser(None, None, op) + try: + return qp.parse_query(word, flags) + except _xapian.QueryParserError, e: + # If we got a parse error, retry without boolean operators (since + # these are the usual cause of the parse error). + return qp.parse_query(string, 0) + + |