diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-13 18:52:13 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-13 18:52:13 (GMT) |
commit | 40d24e04731d71795cce397ef01f97bb261eaf88 (patch) | |
tree | ff4b05226762e3e79f9646feb0cea15a637053a7 | |
parent | 07c534f2ae87736f8f6f70c27ea4dd9ca5ddba98 (diff) |
write locking to protect the connection
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 65 |
1 files changed, 36 insertions, 29 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index 7cb54b6..db4cb34 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -4,6 +4,7 @@ xapianindex maintain indexes on content """ +from __future__ import with_statement __author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' __docformat__ = 'restructuredtext' @@ -11,6 +12,7 @@ __copyright__ = 'Copyright ObjectRealms, LLC, 2007' __license__ = 'The GNU Public License V2+' + from Queue import Queue, Empty import logging import re @@ -66,7 +68,7 @@ class IndexManager(object): self.backingstore = None self.fields = set() - + self._write_lock = threading.Lock() # # Initialization def connect(self, repo, **kwargs): @@ -95,11 +97,6 @@ class IndexManager(object): # signal from backingstore that its our parent self.backingstore = backingstore - # flow control - def flush(self): - """Called after any database mutation""" - self.write_index.flush() - self.read_index.reopen() def stop(self): self.stopIndexer() @@ -119,6 +116,14 @@ class IndexManager(object): self.indexer_running = False self.indexer.join() + # flow control + def flush(self): + """Called after any database mutation""" + with self._write_lock: + self.write_index.flush() + self.read_index.reopen() + + def enque(self, uid, vid, doc, operation, filestuff=None): self.queue.put((uid, vid, doc, operation, filestuff)) @@ -133,31 +138,33 @@ class IndexManager(object): # if this is a non-issue we can simply allow it to block try: uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5) - if operation is DELETE: self.write_index.delete(uid) - elif operation in (CREATE, UPDATE): - # Here we handle the conversion of binary - # documents to plain text for indexing. This is - # done in the thread to keep things async and - # latency lower. - if filestuff: - filename, mimetype = filestuff - fp = converter(filename, mimetype) - if fp: - doc.fields.append(secore.Field('fulltext', - fp.read())) - - if operation is CREATE: self.write_index.add(doc) - elif operation is UPDATE: self.write_index.replace(doc) - - else: - logger.warning("Unknown indexer operation ( %s: %s)" % \ - (uid, operation)) - continue - - # we do flush on each record now + with self._write_lock: + if operation is DELETE: + self.write_index.delete(uid) + logger.info("Deleted Content %s" % (uid,)) + elif operation in (CREATE, UPDATE): + # Here we handle the conversion of binary + # documents to plain text for indexing. This is + # done in the thread to keep things async and + # latency lower. + if filestuff: + filename, mimetype = filestuff + fp = converter(filename, mimetype) + if fp: + doc.fields.append(secore.Field('fulltext', fp.read())) + + if operation is CREATE: self.write_index.add(doc) + elif operation is UPDATE: self.write_index.replace(doc) + logger.info("Indexed Content %s:%s" % (uid, vid)) + else: + logger.warning("Unknown indexer operation ( %s: %s)" % \ + (uid, operation)) + continue + + # we do flush on each record now self.flush() - logger.info("Indexed Content %s:%s" % (uid, vid)) + # but we still tell the queue its complete self.queue.task_done() |