From 68c85fb90cdfcde725095ca3bae5bdfed9d2105e Mon Sep 17 00:00:00 2001 From: Benjamin Saller Date: Wed, 03 Oct 2007 22:14:46 +0000 Subject: single mount point fast path --- diff --git a/bin/datastore-service b/bin/datastore-service index c2a29fa..4186410 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -17,14 +17,14 @@ if not os.path.exists(repo_dir): os.makedirs(repo_dir) log_dir = os.path.join(base_dir, "logs") if not os.path.exists(log_dir): os.makedirs(log_dir) -os.chdir(repo_dir) +#os.chdir(repo_dir) # setup logger filename = None if not sys.stdin.isatty(): filename = os.path.join(log_dir, "datastore.log") -logging.basicConfig(level=logging.DEBUG, +logging.basicConfig(level=logging.WARNING, format="%(asctime)-15s %(levelname)s: %(message)s", filename = filename, ) @@ -85,9 +85,22 @@ def main(): logger.debug("Datastore shutdown with error", exc_info=sys.exc_info()) -main() +#main() #import hotshot #p = hotshot.Profile('hs.prof') #p.run('main()') +import cProfile +import lsprofcalltree +_prof = cProfile.Profile() +_prof.enable() +main() +_prof.disable() +k = lsprofcalltree.KCacheGrind(_prof) +fp = open('/tmp/ds.kgrind', 'w+') +k.output(fp) +fp.close() + + + diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index dcb989e..1417658 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -52,7 +52,7 @@ class BackingStore(object): capabilities = () def __init__(self, uri, **kwargs): - """The kwargs are used to configure the backend so it can + """The kwargs are used to configure the backend so it can provide its interface. See specific backends for details """ pass @@ -394,8 +394,9 @@ class FileBackingStore(BackingStore): return self.indexmanager.get_uniquevaluesfor(propertyname) - def find(self, query): - return self.indexmanager.search(query) + def find(self, query, order_by=None, limit=None): + if not limit: limit = 4069 + return self.indexmanager.search(query, end_index=limit, order_by=order_by) def stop(self): self.indexmanager.stop() diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index d197eda..3c99177 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -406,10 +406,20 @@ class DataStore(dbus.service.Object): @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Created(self, uid): pass + + def _single_search(self, mountpoint, query, order_by, limit): + results, count = mountpoint.find(query.copy(), order_by, limit) + return list(results), count, 1 - def _multiway_search(self, query): + def _multiway_search(self, query, order_by=None, limit=None): mountpoints = query.pop('mountpoints', self.mountpoints) mountpoints = [self.mountpoints[str(m)] for m in mountpoints] + + + if len(mountpoints) == 1: + # Fast path the single mountpoint case + return self._single_search(mountpoints[0], query, order_by, limit) + results = [] # XXX: the merge will become *much* more complex in when # distributed versioning is implemented. @@ -417,7 +427,7 @@ class DataStore(dbus.service.Object): # some queries mutate the query-dict so we pass a copy each # time for mp in mountpoints: - result, count = mp.find(query.copy()) + result, count = mp.find(query.copy(), order_by, limit) results.append(result) # merge @@ -430,7 +440,7 @@ class DataStore(dbus.service.Object): # XXX: age/version check d[hit.id] = hit - return d, len(d) + return d, len(d), len(results) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}', @@ -482,8 +492,10 @@ class DataStore(dbus.service.Object): # distribute the search to all the mountpoints unless a # backingstore id set is specified - results, count = self._multiway_search(kwargs) - + # backends may be able to return sorted results, if there is + # only a single backend in the query we can use pre-sorted + # results directly + results, count, results_from = self._multiway_search(kwargs, order_by, limit) # ordering is difficult when we are dealing with sets from # more than one source. The model is this. @@ -491,64 +503,65 @@ class DataStore(dbus.service.Object): # in post processing. This allows use to assemble partially # database sorted results from many sources and quickly # combine them. - if order_by: - # resolve key names to columns - if isinstance(order_by, basestring): - order_by = [o.strip() for o in order_by.split(',')] - - if not isinstance(order_by, list): - logging.debug("bad query, order_by should be a list of property names") - order_by = None - - # generate a sort function based on the complete set of - # ordering criteria which includes the primary sort - # criteria as well to keep it stable. - def comparator(a, b): - # we only sort on properties so - for criteria in order_by: - mode = 1 # ascending - if criteria.startswith('-'): - mode = -1 - criteria = criteria[1:] - pa = a.get_property(criteria, None) - pb = b.get_property(criteria, None) - r = cmp(pa, pb) * mode - if r != 0: return r - return 0 - - - r = results.values() - r.sort(comparator) - results = r - else: - results = results.values() + if results_from > 1: + if order_by: + # resolve key names to columns + if isinstance(order_by, basestring): + order_by = [o.strip() for o in order_by.split(',')] + + if not isinstance(order_by, list): + logging.debug("bad query, order_by should be a list of property names") + order_by = None + + # generate a sort function based on the complete set of + # ordering criteria which includes the primary sort + # criteria as well to keep it stable. + def comparator(a, b): + # we only sort on properties so + for criteria in order_by: + mode = 1 # ascending + if criteria.startswith('-'): + mode = -1 + criteria = criteria[1:] + pa = a.get_property(criteria, None) + pb = b.get_property(criteria, None) + r = cmp(pa, pb) * mode + if r != 0: return r + return 0 + + + r = results.values() + r.sort(comparator) + results = r + else: + results = results.values() d = [] + c = 0 + if results_from == 1: + mp = results[0].backingstore.id + else: + mp = None + for r in results: - props = {} - props.update(r.properties) + props = r.properties # on versioning stores uid will be different # than r.id but must be set - if 'uid' not in props: - props['uid'] = r.id + #if 'uid' not in props: + props['uid'] = r.id - if 'mountpoint' not in props: - props['mountpoint'] = r.backingstore.id + #if 'mountpoint' not in props: + props['mountpoint'] = mp and mp or r.backingstore.id + + # filename not included in find results + #props['filename'] = '' - filename = '' - if include_files : - try: filename = r.filename - except KeyError: pass - # XXX: this means that find never shows the internally - # stored filename attribute (which is private) - props['filename'] = filename d.append(props) - - if limit: - d = d[offset: offset+limit] + c+= 1 + if limit and c > limit: break - return (d, len(results)) + return (d, len(d)) def get(self, uid, rev=None, mountpoint=None): mp = self._resolveMountpoint(mountpoint) diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py index f17e842..dfbe0df 100644 --- a/src/olpc/datastore/model.py +++ b/src/olpc/datastore/model.py @@ -188,7 +188,7 @@ class Content(object): self._file = None self._model = model self._file = None - + def __repr__(self): return "<%s %s>" %(self.__class__.__name__, self.properties) @@ -206,6 +206,7 @@ class Content(object): @property def properties(self): + d = {} for k, v in self.data.iteritems(): if k in EXCLUDED_PROPERTIES: continue diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index d4128d4..1faa6a9 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -40,6 +40,9 @@ CREATE = 1 UPDATE = 2 DELETE = 3 +ADD = 1 +REMOVE = 2 + class ContentMappingIter(object): """An iterator over a set of results from a search. @@ -159,6 +162,7 @@ class IndexManager(object): elif operation is UPDATE: self.write_index.replace(doc) logger.info("updated %s:%s" % (uid, vid)) + self.flush() # now change CREATE to UPDATE as we set the # properties already @@ -261,7 +265,7 @@ class IndexManager(object): # we do flush on each record (or set for enque # sequences) now - self.flush() + #self.flush() except: logger.exception("Error in indexer") @@ -507,7 +511,7 @@ class IndexManager(object): # # Search - def search(self, query, start_index=0, end_index=4096): + def search(self, query, start_index=0, end_index=4096, order_by=None): """search the xapian store. query is a string defining the serach in standard web search syntax. @@ -516,6 +520,7 @@ class IndexManager(object): to indicate that is is required to be absent. """ ri = self.read_index + if not query: q = self.read_index.query_all() elif isinstance(query, dict): @@ -552,9 +557,16 @@ class IndexManager(object): else: q = self.parse_query(query) - return self._search(q, start_index, end_index) + if order_by and isinstance(order_by, list): + # secore only handles a single item, not a multilayer sort + order_by = order_by[0] + + return self._search(q, start_index, end_index, sortby=order_by) def _search(self, q, start_index, end_index, sortby=None): + start_index = int(start_index) + end_index = int(end_index) + sortby = str(sortby) results = self.read_index.search(q, start_index, end_index, sortby=sortby) count = results.matches_estimated diff --git a/tests/xapianindex.txt b/tests/xapianindex.txt index 583f589..6ef163f 100644 --- a/tests/xapianindex.txt +++ b/tests/xapianindex.txt @@ -86,11 +86,11 @@ Partial search... -We also support tagging of documents. - ->>> im.tag(uid, "foo bar") ->>> assert expect_single(im.search('tags:foo')).id == uid - +# We also support tagging of documents. +# +# >>> im.tag(uid, "foo bar") +# >>> assert expect_single(im.search('tags:foo')).id == uid + Cleanly shut down. >>> im.stop() -- cgit v0.9.1