From 6bb99669eb0606c56ee195473006e219cedb9e81 Mon Sep 17 00:00:00 2001 From: Tomeu Vizoso Date: Wed, 31 Oct 2007 11:11:18 +0000 Subject: #4182: Flush every 20 changes or 1 minute after the last unflushed change. --- diff --git a/NEWS b/NEWS index 756bf1e..a519018 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,4 @@ +* #4182: Flush every 20 changes or 1 minute after the last unflushed change. (tomeu) * #4083: Remove the active loop and check usb sticks on idle. (tomeu) Snapshot 5051fc3a4b diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index 82e8644..06041aa 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -25,6 +25,7 @@ import warnings import secore import xapian as _xapian # we need to modify the QueryParser +import gobject from olpc.datastore import model from olpc.datastore.converter import converter @@ -40,7 +41,10 @@ UPDATE = 2 DELETE = 3 # Force a flush every _n_ changes to the db -THRESHOLD = 128 +FLUSH_THRESHOLD = 20 + +# Force a flush after _n_ seconds since the last change to the db +FLUSH_TIMEOUT = 60 class ReadWriteConnection(secore.indexerconnection.IndexerConnection, secore.searchconnection.SearchConnection): @@ -81,11 +85,14 @@ class IndexManager(object): self.fields = set() self._write_lock = threading.Lock() + self.deltact = 0 # delta count - + self._flush_timeout = None + # # Initialization def connect(self, repo, **kwargs): + logging.debug('IndexManager.connect()') if self.write_index is not None: warnings.warn('''Requested redundant connect to index''', RuntimeWarning) @@ -114,6 +121,7 @@ class IndexManager(object): def stop(self, force=False): + logging.debug('IndexManager.stop()') self.flush(force=True) self.stopIndexer(force) self.write_index.close() @@ -141,15 +149,28 @@ class IndexManager(object): # should terminate after the current task self.indexer.join() + def _flush_timeout_cb(self): + self.flush(True) + return False + # flow control def flush(self, force=False): + logging.debug('IndexManager.flush: %r %r' % (force, self.deltact)) """Called after any database mutation""" + + if self._flush_timeout is not None: + gobject.source_remove(self._flush_timeout) + self._flush_timeout = None + self.deltact += 1 - if force or self.deltact > THRESHOLD: + if force or self.deltact > FLUSH_THRESHOLD: with self._write_lock: self.write_index.flush() #self.read_index.reopen() self.deltact = 0 + else: + self._flush_timeout = gobject.timeout_add(FLUSH_TIMEOUT * 1000, + self._flush_timeout_cb) def enque(self, uid, vid, doc, operation, filestuff=None): # here we implement the sync/async policy -- cgit v0.9.1