Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/xapianindex.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/olpc/datastore/xapianindex.py')
-rw-r--r--src/olpc/datastore/xapianindex.py198
1 files changed, 152 insertions, 46 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index d46e0c7..5f29feb 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -15,6 +15,7 @@ __license__ = 'The GNU Public License V2+'
from Queue import Queue, Empty
import logging
+import os
import re
import sys
import time
@@ -23,6 +24,7 @@ import threading
import warnings
import secore
+import xapian as _xapian # we need to modify the QueryParser
from olpc.datastore import model
from olpc.datastore.converter import converter
@@ -127,7 +129,12 @@ class IndexManager(object):
self.write_index.flush()
self.read_index.reopen()
-
+ def enqueSequence(self, commands):
+ """Takes a sequence of arugments to the normal enque function
+ and executes them under a single lock/flush cycle
+ """
+ self.queue.put(commands)
+
def enque(self, uid, vid, doc, operation, filestuff=None):
# here we implement the sync/async policy
# we want to take create/update operations and
@@ -162,52 +169,76 @@ class IndexManager(object):
# from the item in the repo as that will become our immutable
# copy. Detect those cases and use the internal filename
# property or backingstore._translatePath to get at it
+ versions = self.versions
+ inplace = self.inplace
+
while self.indexer_running:
# include timeout here to ease shutdown of the thread
# if this is a non-issue we can simply allow it to block
try:
data = self.queue.get(True, 0.025)
- uid, vid, doc, operation, filestuff = data
+ # when we enque a sequence of commands they happen
+ # under a single write lock pass through the loop and
+ # the changes become visible at once.
+
+ if not isinstance(data[0], (list, tuple)):
+ data = (data,)
except Empty:
- #time.sleep(1.0)
continue
try:
with self._write_lock:
- if operation is DELETE:
- self.write_index.delete(uid)
- logger.info("deleted content %s" % (uid,))
- elif operation is UPDATE:
- # Here we handle the conversion of binary
- # documents to plain text for indexing. This is
- # done in the thread to keep things async and
- # latency lower.
- # we know that there is filestuff or it
- # wouldn't have been queued
- filename, mimetype = filestuff
- if isinstance(filename, file):
- filename = filename.name
- fp = converter(filename, mimetype)
- if fp:
- # read in at a fixed block size, try to
- # conserve memory. If this doesn't work
- # we can make doc.fields a generator
- while True:
- chunk = fp.read(2048)
- if not chunk: break
- doc.fields.append(secore.Field('fulltext', chunk))
+ for item in data:
+ uid, vid, doc, operation, filestuff = item
+
+ if operation is DELETE:
+ self.write_index.delete(uid)
+ logger.info("deleted content %s" % (uid,))
+ elif operation is UPDATE:
+ # Here we handle the conversion of binary
+ # documents to plain text for indexing. This is
+ # done in the thread to keep things async and
+ # latency lower.
+ # we know that there is filestuff or it
+ # wouldn't have been queued
+ filename, mimetype = filestuff
+ if isinstance(filename, file):
+ filename = filename.name
+ if filename and not os.path.exists(filename):
+ # someone removed the file before
+ # indexing
+ # or the path is broken
+ logger.warning("Expected file for"
+ " indexing at %s. Not"
+ " Found" % filename)
- self.write_index.replace(doc)
- logger.info("update file content %s:%s" % (uid, vid))
+ fp = converter(filename, mimetype)
+ if fp:
+ # read in at a fixed block size, try to
+ # conserve memory. If this doesn't work
+ # we can make doc.fields a generator
+ while True:
+ chunk = fp.read(2048)
+ if not chunk: break
+ doc.fields.append(secore.Field('fulltext', chunk))
+
+ self.write_index.replace(doc)
+ if versions and not inplace:
+ # we know the source file is ours
+ # to remove
+ os.unlink(filename)
+
+ logger.info("update file content %s:%s" % (uid, vid))
+ else:
+ logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
else:
- logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
- else:
- logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
-
+ logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
+
# tell the queue its complete
self.queue.task_done()
- # we do flush on each record now
+ # we do flush on each record (or set for enque
+ # sequences) now
self.flush()
except:
logger.exception("Error in indexer")
@@ -267,34 +298,63 @@ class IndexManager(object):
return d
+ @property
+ def versions(self):
+ if self.backingstore:
+ return "versions" in self.backingstore.capabilities
+ return False
+
+ @property
+ def inplace(self):
+ if self.backingstore:
+ return "inplace" in self.backingstore.capabilities
+ return False
+
+
def index(self, props, filename=None):
"""Index the content of an object.
Props must contain the following:
key -> Property()
"""
operation = UPDATE
+
#
# Version handling
#
- # we implicitly create new versions of documents the version
- # id should have been set by the higher level system
+ # are we doing any special handling for versions?
uid = props.pop('uid', None)
- vid = props.pop('vid', None)
+
if not uid:
uid = create_uid()
operation = CREATE
- if vid: vid = str(float(vid) + 1.0)
- else: vid = "1.0"
# Property mapping via model
- props = self._mapProperties(props)
doc = secore.UnprocessedDocument()
add = doc.fields.append
- fp = None
+
+ vid = None
+ if self.versions:
+ vid = props.get("vid")
+ if not vid:
+ self.warn("Didn't supply needed versioning information"
+ " on a backingstore which performs versioning")
+ # each versions id is unique when using a versioning store
+ doc.id = create_uid()
+ else:
+ doc.id = uid
+
+ if not vid: vid = '1'
+ # on non-versioning stores this is redundant but on versioning
+ # stores it reference to the objects whole timeline
+ props['uid'] = uid
+ props['vid'] = vid
+
+ props = self._mapProperties(props)
+
filestuff = None
if filename:
# enque async file processing
@@ -302,11 +362,11 @@ class IndexManager(object):
# and open fp?
mimetype = props.get("mime_type")
mimetype = mimetype and mimetype.value or 'text/plain'
+
+ filename = os.path.abspath(filename)
filestuff = (filename, mimetype)
- doc.id = uid
- add(secore.Field('vid', vid))
-
+
#
# Property indexing
for k, prop in props.iteritems():
@@ -322,13 +382,26 @@ class IndexManager(object):
# queue the document for processing
self.enque(uid, vid, doc, operation, filestuff)
- return uid
+ return doc.id
def get(self, uid):
doc = self.read_index.get_document(uid)
if not doc: raise KeyError(uid)
return model.Content(doc, self.backingstore, self.datamodel)
+ def get_by_uid_prop(self, uid, rev=None):
+ # versioning stores fetch objects by uid
+ # when rev is passed only that particular rev is returne
+ ri = self.read_index
+ q = ri.query_field('uid', uid)
+ if rev:
+ q = ri.query_filter(q, ri.query_field('vid', str(rev)))
+ results, count = self._search(q, 0, 1000)
+
+ return results, count
+
+
+
def delete(self, uid):
# does this need queuing?
# the higher level abstractions have to handle interaction
@@ -381,8 +454,11 @@ class IndexManager(object):
q = ri.query_composite(ri.OP_AND, queries)
else:
q = self.parse_query(query)
-
- results = ri.search(q, start_index, end_index)
+
+ return self._search(q, start_index, end_index)
+
+ def _search(self, q, start_index, end_index):
+ results = self.read_index.search(q, start_index, end_index)
count = results.matches_estimated
# map the result set to model.Content items
@@ -423,6 +499,12 @@ class IndexManager(object):
# 'title:foo' match a document whose title contains 'foo'
# 'title:"A tale of two datastores"' exact title match
# '-this that' match that w/o this
+
+ # limited support for wildcard searches
+ qp = _xapian.QueryParser
+
+ flags = (qp.FLAG_LOVEHATE)
+
ri = self.read_index
start = 0
end = len(query)
@@ -450,11 +532,35 @@ class IndexManager(object):
#XXX: strip quotes or not here
#word = query[orig+1:qm.end(1)-1]
word = query[orig:qm.end(1)]
+ # this is a phase modify the flags
+ flags |= qp.FLAG_PHRASE
start = qm.end(1) + 1
if field:
queries.append(ri.query_field(field, word))
else:
- queries.append(ri.query_parse(word))
+ if word.endswith("*"):
+ flags |= qp.FLAG_WILDCARD
+ q = self._query_parse(word, flags)
+
+ queries.append(q)
q = ri.query_composite(ri.OP_AND, queries)
return q
+
+
+
+ def _query_parse(self, word, flags=0, op=None):
+ # while newer secore do pass flags it doesn't allow control
+ # over them at the API level. We override here to support
+ # adding wildcard searching
+ ri = self.read_index
+ if op is None: op = ri.OP_AND
+ qp = ri._prepare_queryparser(None, None, op)
+ try:
+ return qp.parse_query(word, flags)
+ except _xapian.QueryParserError, e:
+ # If we got a parse error, retry without boolean operators (since
+ # these are the usual cause of the parse error).
+ return qp.parse_query(string, 0)
+
+