Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-07-13 18:52:13 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-07-13 18:52:13 (GMT)
commit40d24e04731d71795cce397ef01f97bb261eaf88 (patch)
treeff4b05226762e3e79f9646feb0cea15a637053a7
parent07c534f2ae87736f8f6f70c27ea4dd9ca5ddba98 (diff)
write locking to protect the connection
-rw-r--r--src/olpc/datastore/xapianindex.py65
1 files changed, 36 insertions, 29 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index 7cb54b6..db4cb34 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -4,6 +4,7 @@ xapianindex
maintain indexes on content
"""
+from __future__ import with_statement
__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
__docformat__ = 'restructuredtext'
@@ -11,6 +12,7 @@ __copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__ = 'The GNU Public License V2+'
+
from Queue import Queue, Empty
import logging
import re
@@ -66,7 +68,7 @@ class IndexManager(object):
self.backingstore = None
self.fields = set()
-
+ self._write_lock = threading.Lock()
#
# Initialization
def connect(self, repo, **kwargs):
@@ -95,11 +97,6 @@ class IndexManager(object):
# signal from backingstore that its our parent
self.backingstore = backingstore
- # flow control
- def flush(self):
- """Called after any database mutation"""
- self.write_index.flush()
- self.read_index.reopen()
def stop(self):
self.stopIndexer()
@@ -119,6 +116,14 @@ class IndexManager(object):
self.indexer_running = False
self.indexer.join()
+ # flow control
+ def flush(self):
+ """Called after any database mutation"""
+ with self._write_lock:
+ self.write_index.flush()
+ self.read_index.reopen()
+
+
def enque(self, uid, vid, doc, operation, filestuff=None):
self.queue.put((uid, vid, doc, operation, filestuff))
@@ -133,31 +138,33 @@ class IndexManager(object):
# if this is a non-issue we can simply allow it to block
try:
uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5)
- if operation is DELETE: self.write_index.delete(uid)
- elif operation in (CREATE, UPDATE):
- # Here we handle the conversion of binary
- # documents to plain text for indexing. This is
- # done in the thread to keep things async and
- # latency lower.
- if filestuff:
- filename, mimetype = filestuff
- fp = converter(filename, mimetype)
- if fp:
- doc.fields.append(secore.Field('fulltext',
- fp.read()))
-
- if operation is CREATE: self.write_index.add(doc)
- elif operation is UPDATE: self.write_index.replace(doc)
-
- else:
- logger.warning("Unknown indexer operation ( %s: %s)" % \
- (uid, operation))
- continue
-
- # we do flush on each record now
+ with self._write_lock:
+ if operation is DELETE:
+ self.write_index.delete(uid)
+ logger.info("Deleted Content %s" % (uid,))
+ elif operation in (CREATE, UPDATE):
+ # Here we handle the conversion of binary
+ # documents to plain text for indexing. This is
+ # done in the thread to keep things async and
+ # latency lower.
+ if filestuff:
+ filename, mimetype = filestuff
+ fp = converter(filename, mimetype)
+ if fp:
+ doc.fields.append(secore.Field('fulltext', fp.read()))
+
+ if operation is CREATE: self.write_index.add(doc)
+ elif operation is UPDATE: self.write_index.replace(doc)
+ logger.info("Indexed Content %s:%s" % (uid, vid))
+ else:
+ logger.warning("Unknown indexer operation ( %s: %s)" % \
+ (uid, operation))
+ continue
+
+ # we do flush on each record now
self.flush()
- logger.info("Indexed Content %s:%s" % (uid, vid))
+
# but we still tell the queue its complete
self.queue.task_done()