diff options
Diffstat (limited to 'sugar_network/document/index_proxy.py')
-rw-r--r-- | sugar_network/document/index_proxy.py | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/sugar_network/document/index_proxy.py b/sugar_network/document/index_proxy.py new file mode 100644 index 0000000..fcde4c1 --- /dev/null +++ b/sugar_network/document/index_proxy.py @@ -0,0 +1,323 @@ +# Copyright (C) 2012 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import bisect +import logging +from os.path import exists + +import xapian + +from active_toolkit import util +from active_document import index_queue, env +from active_document.storage import Storage +from active_document.index import IndexReader, IndexWriter +from active_document.metadata import StoredProperty + + +_logger = logging.getLogger('active_document.index_proxy') + + +class IndexProxy(IndexReader): + + def __init__(self, root, metadata): + self._commit_seqno = 0 + self._cache_seqno = 1 + self._term_props = {} + self._pages = {} + self._dirty = False + + for prop in metadata.values(): + if isinstance(prop, StoredProperty) and \ + prop.permissions & env.ACCESS_WRITE: + self._term_props[prop.name] = prop + + IndexReader.__init__(self, root, metadata) + + def commit(self): + index_queue.commit_and_wait(self.metadata.name) + + def close(self): + pass + + def get_cached(self, guid): + self._drop_pages() + result = {} + for page in self._sorted_pages: + cached = page.get(guid) + if cached is not None: + result.update(cached.properties) + return result + + def store(self, guid, properties, new, pre_cb=None, post_cb=None, *args): + if properties and new is not None: + if new: + orig = None + else: + orig = self.get_cached(guid) + try: + # XXX Avoid creating Storage every time + record = Storage(self._path, self.metadata).get(guid) + for prop in self._term_props.values(): + if prop.name not in orig: + orig[prop.name] = record.get(prop.name)['value'] + except env.NotFound: + pass + # Needs to be called before `index_queue.put()` + # to give it a chance to read original properties from the storage + page = self._pages.get(self._cache_seqno) + if page is None: + page = self._pages[self._cache_seqno] = \ + _CachedPage(self._term_props) + _logger.debug('New cache page for %r with seqno %s to ' + 'insert %r', self.metadata.name, self._cache_seqno, + guid) + page.update(guid, properties, orig) + + self._put(IndexWriter.store, guid, properties, new, pre_cb, post_cb, + *args) + + def delete(self, guid, post_cb=None, *args): + self._put(IndexWriter.delete, guid, post_cb, *args) + + def find(self, query): + self._do_open(False) + + if query.no_cache: + pages = [] + else: + pages = self._sorted_pages + + def next_page_find(query): + if pages: + return pages.pop().find(query, next_page_find) + elif self._db is None: + return [], 0 + else: + return IndexReader.find(self, query) + + return next_page_find(query) + + @property + def _sorted_pages(self): + return [self._pages[i] for i in sorted(self._pages.keys())] + + def _do_open(self, reset): + if reset: + self._dirty = True + + if self._db is None: + if exists(self._path): + self._drop_pages() + else: + return + elif not self._drop_pages() and not self._dirty: + return + + try: + if self._db is None: + self._db = xapian.Database(self._path) + _logger.debug('Opened %r RO index', self.metadata.name) + else: + self._db.reopen() + _logger.debug('Re-opened %r RO index', self.metadata.name) + self._dirty = False + except Exception: + util.exception(_logger, + 'Cannot open %r RO index', self.metadata.name) + self._db = None + + def _drop_pages(self): + commit_seqno = index_queue.commit_seqno(self.metadata.name) + if commit_seqno == self._commit_seqno: + return False + for seqno in self._pages.keys(): + if seqno <= commit_seqno: + del self._pages[seqno] + _logger.debug('Drop cache page for %r with seqno %s', + self.metadata.name, seqno) + self._commit_seqno = commit_seqno + self._dirty = True + return True + + def _put(self, op, *args): + _logger.debug('Push %r(%r) to %rs queue', + op, args, self.metadata.name) + new_cache_seqno = index_queue.put(self.metadata.name, op, *args) + if new_cache_seqno != self._cache_seqno: + self._cache_seqno = new_cache_seqno + self._pages[new_cache_seqno] = _CachedPage(self._term_props) + _logger.debug('New cache page for %r with seqno %s', + self.metadata.name, new_cache_seqno) + + +class _CachedPage(dict): + + def __init__(self, term_props): + self._term_props = term_props + + def update(self, guid, props, orig): + existing = self.get(guid) + if existing is None: + self[guid] = _CachedDocument(self._term_props, guid, props, orig) + else: + existing.update(props) + + def find(self, query, direct_find): + if 'guid' in query.request: + documents, total = direct_find(query) + cache = self.get(query.request['guid']) + if cache is None: + return documents, total + + def patched_guid_find(): + processed = False + for guid, props in documents: + processed = True + props.update(cache.properties) + yield guid, props + if not processed: + yield cache.guid, cache.properties + + return patched_guid_find(), total + + if not self: + return direct_find(query) + + adds, deletes, updates = self._patch_find(query.request) + if not adds and not deletes and not updates: + return direct_find(query) + + orig_limit = query.limit + query.limit += len(deletes) + documents, total = direct_find(query) + total.value += len(adds) + + def patched_find(orig_limit): + for guid, props in documents: + if orig_limit < 1: + break + if guid in deletes: + total.value -= 1 + continue + cache = updates.get(guid) + if cache is not None: + props.update(cache.properties) + yield guid, props + orig_limit -= 1 + + for doc in adds: + if orig_limit < 1: + break + yield doc.guid, doc.properties + orig_limit -= 1 + + return patched_find(orig_limit), total + + def _patch_find(self, request): + adds = [] + deletes = set() + updates = {} + + terms = set() + for prop_name, value in request.items(): + prop = self._term_props.get(prop_name) + if prop is None: + continue + try: + value = prop.decode(value) + except ValueError, error: + _logger.debug('Wrong request property value %r for %r ' + 'property, thus the whole request is empty: %s', + value, prop_name, error) + return None, None, None + terms.add(_TermValue(prop, value)) + + for cache in self.values(): + if cache.new: + if terms.issubset(cache.terms): + bisect.insort(adds, cache) + else: + if terms: + if terms.issubset(cache.terms): + if not terms.issubset(cache.orig_terms): + bisect.insort(adds, cache) + continue + else: + if terms.issubset(cache.orig_terms): + deletes.add(cache.guid) + continue + updates[cache.guid] = cache + + return adds, deletes, updates + + +class _CachedDocument(object): + + def __init__(self, term_props, guid, properties, orig): + self.guid = guid + self.properties = properties.copy() + self.new = orig is None + self.terms = set() + self.orig_terms = set() + self._term_props = term_props + + for prop in term_props.values(): + if orig is not None: + self.orig_terms.add(_TermValue(prop, orig.get(prop.name))) + + self._update_terms() + + def __sort__(self, other): + return cmp(self.guid, other.guid) + + def update(self, properties): + self.properties.update(properties) + self._update_terms() + + def _update_terms(self): + self.terms.clear() + orig_terms = {} + for i in self.orig_terms: + orig_terms[i.prop] = i.value + for prop in self._term_props.values(): + term = self.properties.get(prop.name, orig_terms.get(prop)) + self.terms.add(_TermValue(prop, term)) + + +class _TermValue: + + def __init__(self, prop, value): + self.prop = prop + self.value = value + + def __repr__(self): + return '%s=%s' % (self.prop.name, self.value) + + def __cmp__(self, other): + result = cmp(self.prop.name, other.prop.name) + if result: + return result + if not self.prop.composite: + return cmp(self.value, other.value) + self_value = set(self.value) + other_value = set(other.value) + if self_value and self_value.issubset(other_value) or \ + other_value and other_value.issubset(self_value): + return 0 + else: + return cmp(self.value, other.value) + + def __hash__(self): + return hash(self.prop.name) |