Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-10-05 17:00:51 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-10-05 17:00:51 (GMT)
commit46463aee1ff3af53ddfe40559706b503d3c2b267 (patch)
tree4028ceeecfc83f3e1a30922e3550ce97444acd3b
parent902883ab7c17c4bc4f31ad492902dae6ef01e1ce (diff)
single xapian connection (we might revert this but it manages fewer flushes to disk)
offset support and pass through for the single mountpoint fastpath
-rw-r--r--src/olpc/datastore/backingstore.py4
-rw-r--r--src/olpc/datastore/datastore.py16
-rw-r--r--src/olpc/datastore/xapianindex.py29
3 files changed, 33 insertions, 16 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 1e82cbc..04ef20e 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -558,9 +558,9 @@ class FileBackingStore(BackingStore):
return self.indexmanager.get_uniquevaluesfor(propertyname)
- def find(self, query, order_by=None, limit=None):
+ def find(self, query, order_by=None, limit=None, offset=0):
if not limit: limit = 4069
- return self.indexmanager.search(query, end_index=limit, order_by=order_by)
+ return self.indexmanager.search(query, start_index=offset, end_index=limit, order_by=order_by)
def stop(self):
self.indexmanager.stop()
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index ba0b077..3f1c632 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -217,17 +217,19 @@ class DataStore(dbus.service.Object):
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Created(self, uid): pass
- def _single_search(self, mountpoint, query, order_by, limit):
- results, count = mountpoint.find(query.copy(), order_by, limit)
+ def _single_search(self, mountpoint, query, order_by, limit, offset):
+ results, count = mountpoint.find(query.copy(), order_by,
+ limit, offset)
return list(results), count, 1
- def _multiway_search(self, query, order_by=None, limit=None):
+ def _multiway_search(self, query, order_by=None, limit=None, offset=None):
mountpoints = query.pop('mountpoints', self.mountpoints)
mountpoints = [self.mountpoints[str(m)] for m in mountpoints]
if len(mountpoints) == 1:
# Fast path the single mountpoint case
- return self._single_search(mountpoints[0], query, order_by, limit)
+ return self._single_search(mountpoints[0], query,
+ order_by, limit, offset)
results = []
# XXX: the merge will become *much* more complex in when
@@ -304,7 +306,9 @@ class DataStore(dbus.service.Object):
# backends may be able to return sorted results, if there is
# only a single backend in the query we can use pre-sorted
# results directly
- results, count, results_from = self._multiway_search(kwargs, order_by, limit)
+ results, count, results_from = self._multiway_search(kwargs,
+ order_by,
+ limit, offset)
# ordering is difficult when we are dealing with sets from
@@ -343,6 +347,8 @@ class DataStore(dbus.service.Object):
r = results.values()
r.sort(comparator)
results = r
+
+ results = results[offset:limit+offset]
else:
results = results.values()
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index d170ace..5847957 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -39,6 +39,13 @@ CREATE = 1
UPDATE = 2
DELETE = 3
+# Force a flush every _n_ changes to the db
+THRESHOLD = 128
+
+class ReadWriteConnection(secore.indexerconnection.IndexerConnection,
+ secore.searchconnection.SearchConnection):
+ # has search methods on a write connection
+ pass
class ContentMappingIter(object):
"""An iterator over a set of results from a search.
@@ -74,6 +81,8 @@ class IndexManager(object):
self.fields = set()
self._write_lock = threading.Lock()
+ self.deltact = 0 # delta count
+
#
# Initialization
def connect(self, repo, **kwargs):
@@ -82,7 +91,7 @@ class IndexManager(object):
RuntimeWarning)
self.repo = repo
- self.write_index = secore.IndexerConnection(repo)
+ self.write_index = ReadWriteConnection(repo)
# configure the database according to the model
datamodel = kwargs.get('model', model.defaultModel)
@@ -91,9 +100,9 @@ class IndexManager(object):
# store a reference
self.datamodel = datamodel
- self.read_index = secore.SearchConnection(repo)
+ self.read_index = self.write_index
- self.flush()
+ self.flush(force=True)
# by default we start the indexer now
self.startIndexer()
assert self.indexer.isAlive()
@@ -107,7 +116,7 @@ class IndexManager(object):
def stop(self, force=False):
self.stopIndexer(force)
self.write_index.close()
- self.read_index.close()
+ #self.read_index.close()
# XXX: work around for xapian not having close() this will
# change in the future in the meantime we delete the
# references to the indexers and then force the gc() to run
@@ -132,12 +141,14 @@ class IndexManager(object):
self.indexer.join()
# flow control
- def flush(self):
+ def flush(self, force=False):
"""Called after any database mutation"""
- with self._write_lock:
- self.write_index.flush()
- self.read_index.reopen()
-
+ self.deltact += 1
+ if force or self.deltact > THRESHOLD:
+ with self._write_lock:
+ self.write_index.flush()
+ #self.read_index.reopen()
+ self.deltact = 0
def enque(self, uid, vid, doc, operation, filestuff=None):
# here we implement the sync/async policy