diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-10-05 17:00:51 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-10-05 17:00:51 (GMT) |
commit | 46463aee1ff3af53ddfe40559706b503d3c2b267 (patch) | |
tree | 4028ceeecfc83f3e1a30922e3550ce97444acd3b | |
parent | 902883ab7c17c4bc4f31ad492902dae6ef01e1ce (diff) |
single xapian connection (we might revert this but it manages fewer flushes to disk)
offset support and pass through for the single mountpoint fastpath
-rw-r--r-- | src/olpc/datastore/backingstore.py | 4 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 16 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 29 |
3 files changed, 33 insertions, 16 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 1e82cbc..04ef20e 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -558,9 +558,9 @@ class FileBackingStore(BackingStore): return self.indexmanager.get_uniquevaluesfor(propertyname) - def find(self, query, order_by=None, limit=None): + def find(self, query, order_by=None, limit=None, offset=0): if not limit: limit = 4069 - return self.indexmanager.search(query, end_index=limit, order_by=order_by) + return self.indexmanager.search(query, start_index=offset, end_index=limit, order_by=order_by) def stop(self): self.indexmanager.stop() diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index ba0b077..3f1c632 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -217,17 +217,19 @@ class DataStore(dbus.service.Object): @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Created(self, uid): pass - def _single_search(self, mountpoint, query, order_by, limit): - results, count = mountpoint.find(query.copy(), order_by, limit) + def _single_search(self, mountpoint, query, order_by, limit, offset): + results, count = mountpoint.find(query.copy(), order_by, + limit, offset) return list(results), count, 1 - def _multiway_search(self, query, order_by=None, limit=None): + def _multiway_search(self, query, order_by=None, limit=None, offset=None): mountpoints = query.pop('mountpoints', self.mountpoints) mountpoints = [self.mountpoints[str(m)] for m in mountpoints] if len(mountpoints) == 1: # Fast path the single mountpoint case - return self._single_search(mountpoints[0], query, order_by, limit) + return self._single_search(mountpoints[0], query, + order_by, limit, offset) results = [] # XXX: the merge will become *much* more complex in when @@ -304,7 +306,9 @@ class DataStore(dbus.service.Object): # backends may be able to return sorted results, if there is # only a single backend in the query we can use pre-sorted # results directly - results, count, results_from = self._multiway_search(kwargs, order_by, limit) + results, count, results_from = self._multiway_search(kwargs, + order_by, + limit, offset) # ordering is difficult when we are dealing with sets from @@ -343,6 +347,8 @@ class DataStore(dbus.service.Object): r = results.values() r.sort(comparator) results = r + + results = results[offset:limit+offset] else: results = results.values() diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index d170ace..5847957 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -39,6 +39,13 @@ CREATE = 1 UPDATE = 2 DELETE = 3 +# Force a flush every _n_ changes to the db +THRESHOLD = 128 + +class ReadWriteConnection(secore.indexerconnection.IndexerConnection, + secore.searchconnection.SearchConnection): + # has search methods on a write connection + pass class ContentMappingIter(object): """An iterator over a set of results from a search. @@ -74,6 +81,8 @@ class IndexManager(object): self.fields = set() self._write_lock = threading.Lock() + self.deltact = 0 # delta count + # # Initialization def connect(self, repo, **kwargs): @@ -82,7 +91,7 @@ class IndexManager(object): RuntimeWarning) self.repo = repo - self.write_index = secore.IndexerConnection(repo) + self.write_index = ReadWriteConnection(repo) # configure the database according to the model datamodel = kwargs.get('model', model.defaultModel) @@ -91,9 +100,9 @@ class IndexManager(object): # store a reference self.datamodel = datamodel - self.read_index = secore.SearchConnection(repo) + self.read_index = self.write_index - self.flush() + self.flush(force=True) # by default we start the indexer now self.startIndexer() assert self.indexer.isAlive() @@ -107,7 +116,7 @@ class IndexManager(object): def stop(self, force=False): self.stopIndexer(force) self.write_index.close() - self.read_index.close() + #self.read_index.close() # XXX: work around for xapian not having close() this will # change in the future in the meantime we delete the # references to the indexers and then force the gc() to run @@ -132,12 +141,14 @@ class IndexManager(object): self.indexer.join() # flow control - def flush(self): + def flush(self, force=False): """Called after any database mutation""" - with self._write_lock: - self.write_index.flush() - self.read_index.reopen() - + self.deltact += 1 + if force or self.deltact > THRESHOLD: + with self._write_lock: + self.write_index.flush() + #self.read_index.reopen() + self.deltact = 0 def enque(self, uid, vid, doc, operation, filestuff=None): # here we implement the sync/async policy |