Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 11:11:18 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 11:11:18 (GMT)
commit6bb99669eb0606c56ee195473006e219cedb9e81 (patch)
tree83260e85c510537ed16302f0bbbe01a30306caa9
parent8f8bbe73fdc61ce3c1716bfc57a21eef7cb2d7e6 (diff)
#4182: Flush every 20 changes or 1 minute after the last unflushed change.
-rw-r--r--NEWS1
-rw-r--r--src/olpc/datastore/xapianindex.py27
2 files changed, 25 insertions, 3 deletions
diff --git a/NEWS b/NEWS
index 756bf1e..a519018 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,4 @@
+* #4182: Flush every 20 changes or 1 minute after the last unflushed change. (tomeu)
* #4083: Remove the active loop and check usb sticks on idle. (tomeu)
Snapshot 5051fc3a4b
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index 82e8644..06041aa 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -25,6 +25,7 @@ import warnings
import secore
import xapian as _xapian # we need to modify the QueryParser
+import gobject
from olpc.datastore import model
from olpc.datastore.converter import converter
@@ -40,7 +41,10 @@ UPDATE = 2
DELETE = 3
# Force a flush every _n_ changes to the db
-THRESHOLD = 128
+FLUSH_THRESHOLD = 20
+
+# Force a flush after _n_ seconds since the last change to the db
+FLUSH_TIMEOUT = 60
class ReadWriteConnection(secore.indexerconnection.IndexerConnection,
secore.searchconnection.SearchConnection):
@@ -81,11 +85,14 @@ class IndexManager(object):
self.fields = set()
self._write_lock = threading.Lock()
+
self.deltact = 0 # delta count
-
+ self._flush_timeout = None
+
#
# Initialization
def connect(self, repo, **kwargs):
+ logging.debug('IndexManager.connect()')
if self.write_index is not None:
warnings.warn('''Requested redundant connect to index''',
RuntimeWarning)
@@ -114,6 +121,7 @@ class IndexManager(object):
def stop(self, force=False):
+ logging.debug('IndexManager.stop()')
self.flush(force=True)
self.stopIndexer(force)
self.write_index.close()
@@ -141,15 +149,28 @@ class IndexManager(object):
# should terminate after the current task
self.indexer.join()
+ def _flush_timeout_cb(self):
+ self.flush(True)
+ return False
+
# flow control
def flush(self, force=False):
+ logging.debug('IndexManager.flush: %r %r' % (force, self.deltact))
"""Called after any database mutation"""
+
+ if self._flush_timeout is not None:
+ gobject.source_remove(self._flush_timeout)
+ self._flush_timeout = None
+
self.deltact += 1
- if force or self.deltact > THRESHOLD:
+ if force or self.deltact > FLUSH_THRESHOLD:
with self._write_lock:
self.write_index.flush()
#self.read_index.reopen()
self.deltact = 0
+ else:
+ self._flush_timeout = gobject.timeout_add(FLUSH_TIMEOUT * 1000,
+ self._flush_timeout_cb)
def enque(self, uid, vid, doc, operation, filestuff=None):
# here we implement the sync/async policy