diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-15 21:12:09 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-15 21:12:09 (GMT) |
commit | 22d56e51057df1c686b456b7bbd5d893985ed83b (patch) | |
tree | 55eb7713b053eb755661306ae3f4f16814f8746f /src | |
parent | 40d24e04731d71795cce397ef01f97bb261eaf88 (diff) |
this should do the indexing of file content async, the thread sync issues seem to have been resolved
Diffstat (limited to 'src')
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 72 |
1 files changed, 35 insertions, 37 deletions
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index db4cb34..d455013 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -125,6 +125,23 @@ class IndexManager(object): def enque(self, uid, vid, doc, operation, filestuff=None): + # here we implement the sync/async policy + # we want to take create/update operations and + # set theproperties right away, the + # conversion/fulltext indexing can + # happen in the thread + if operation in (CREATE, UPDATE): + with self._write_lock: + if operation is CREATE: self.write_index.add(doc) + elif operation is UPDATE: self.write_index.replace(doc) + self.flush() + # now change CREATE to UPDATE as we set the + # properties already + operation = UPDATE + if not filestuff: + # In this case we are done + return + self.queue.put((uid, vid, doc, operation, filestuff)) def indexThread(self): @@ -138,57 +155,38 @@ class IndexManager(object): # if this is a non-issue we can simply allow it to block try: uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5) + except Empty: + continue + + try: with self._write_lock: if operation is DELETE: self.write_index.delete(uid) logger.info("Deleted Content %s" % (uid,)) - elif operation in (CREATE, UPDATE): + elif operation is UPDATE: # Here we handle the conversion of binary # documents to plain text for indexing. This is # done in the thread to keep things async and # latency lower. - if filestuff: - filename, mimetype = filestuff - fp = converter(filename, mimetype) - if fp: - doc.fields.append(secore.Field('fulltext', fp.read())) - - if operation is CREATE: self.write_index.add(doc) - elif operation is UPDATE: self.write_index.replace(doc) - logger.info("Indexed Content %s:%s" % (uid, vid)) + # we know that there is filestuff or it + # wouldn't have been queued + filename, mimetype = filestuff + fp = converter(filename, mimetype) + if fp: + doc.fields.append(secore.Field('fulltext', fp.read())) + self.write_index.replace(doc) + logger.info("Update Content %s:%s" % (uid, vid)) + else: + logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) else: - logger.warning("Unknown indexer operation ( %s: %s)" % \ - (uid, operation)) - continue - - # we do flush on each record now + logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) + # we do flush on each record now self.flush() - - - # but we still tell the queue its complete + # tell the queue its complete self.queue.task_done() - - except Empty: - pass except: logger.exception("Error in indexer") -## except: -## import traceback -## traceback.print_exc() -## try: self.write_index.close() -## except: pass -## try: -## self.write_index = secore.IndexerConnection(self.repo) -## self.read_index.reopen() -## except: -## # Shut down the indexer -## logger.critical("Indexer Failed, Shutting it down") -## self.indexer_running = False - - - - def complete_indexing(self): """Intentionally block until the indexing is complete. Used |