Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-07-15 21:12:09 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-07-15 21:12:09 (GMT)
commit22d56e51057df1c686b456b7bbd5d893985ed83b (patch)
tree55eb7713b053eb755661306ae3f4f16814f8746f
parent40d24e04731d71795cce397ef01f97bb261eaf88 (diff)
this should do the indexing of file content async, the thread sync issues seem to have been resolved
-rw-r--r--src/olpc/datastore/xapianindex.py72
1 files changed, 35 insertions, 37 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index db4cb34..d455013 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -125,6 +125,23 @@ class IndexManager(object):
def enque(self, uid, vid, doc, operation, filestuff=None):
+ # here we implement the sync/async policy
+ # we want to take create/update operations and
+ # set theproperties right away, the
+ # conversion/fulltext indexing can
+ # happen in the thread
+ if operation in (CREATE, UPDATE):
+ with self._write_lock:
+ if operation is CREATE: self.write_index.add(doc)
+ elif operation is UPDATE: self.write_index.replace(doc)
+ self.flush()
+ # now change CREATE to UPDATE as we set the
+ # properties already
+ operation = UPDATE
+ if not filestuff:
+ # In this case we are done
+ return
+
self.queue.put((uid, vid, doc, operation, filestuff))
def indexThread(self):
@@ -138,57 +155,38 @@ class IndexManager(object):
# if this is a non-issue we can simply allow it to block
try:
uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5)
+ except Empty:
+ continue
+
+ try:
with self._write_lock:
if operation is DELETE:
self.write_index.delete(uid)
logger.info("Deleted Content %s" % (uid,))
- elif operation in (CREATE, UPDATE):
+ elif operation is UPDATE:
# Here we handle the conversion of binary
# documents to plain text for indexing. This is
# done in the thread to keep things async and
# latency lower.
- if filestuff:
- filename, mimetype = filestuff
- fp = converter(filename, mimetype)
- if fp:
- doc.fields.append(secore.Field('fulltext', fp.read()))
-
- if operation is CREATE: self.write_index.add(doc)
- elif operation is UPDATE: self.write_index.replace(doc)
- logger.info("Indexed Content %s:%s" % (uid, vid))
+ # we know that there is filestuff or it
+ # wouldn't have been queued
+ filename, mimetype = filestuff
+ fp = converter(filename, mimetype)
+ if fp:
+ doc.fields.append(secore.Field('fulltext', fp.read()))
+ self.write_index.replace(doc)
+ logger.info("Update Content %s:%s" % (uid, vid))
+ else:
+ logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
else:
- logger.warning("Unknown indexer operation ( %s: %s)" % \
- (uid, operation))
- continue
-
- # we do flush on each record now
+ logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
+ # we do flush on each record now
self.flush()
-
-
- # but we still tell the queue its complete
+ # tell the queue its complete
self.queue.task_done()
-
- except Empty:
- pass
except:
logger.exception("Error in indexer")
-## except:
-## import traceback
-## traceback.print_exc()
-## try: self.write_index.close()
-## except: pass
-## try:
-## self.write_index = secore.IndexerConnection(self.repo)
-## self.read_index.reopen()
-## except:
-## # Shut down the indexer
-## logger.critical("Indexer Failed, Shutting it down")
-## self.indexer_running = False
-
-
-
-
def complete_indexing(self):
"""Intentionally block until the indexing is complete. Used