Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/document/index_proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/document/index_proxy.py')
-rw-r--r--sugar_network/document/index_proxy.py323
1 files changed, 323 insertions, 0 deletions
diff --git a/sugar_network/document/index_proxy.py b/sugar_network/document/index_proxy.py
new file mode 100644
index 0000000..fcde4c1
--- /dev/null
+++ b/sugar_network/document/index_proxy.py
@@ -0,0 +1,323 @@
+# Copyright (C) 2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import bisect
+import logging
+from os.path import exists
+
+import xapian
+
+from active_toolkit import util
+from active_document import index_queue, env
+from active_document.storage import Storage
+from active_document.index import IndexReader, IndexWriter
+from active_document.metadata import StoredProperty
+
+
+_logger = logging.getLogger('active_document.index_proxy')
+
+
+class IndexProxy(IndexReader):
+
+ def __init__(self, root, metadata):
+ self._commit_seqno = 0
+ self._cache_seqno = 1
+ self._term_props = {}
+ self._pages = {}
+ self._dirty = False
+
+ for prop in metadata.values():
+ if isinstance(prop, StoredProperty) and \
+ prop.permissions & env.ACCESS_WRITE:
+ self._term_props[prop.name] = prop
+
+ IndexReader.__init__(self, root, metadata)
+
+ def commit(self):
+ index_queue.commit_and_wait(self.metadata.name)
+
+ def close(self):
+ pass
+
+ def get_cached(self, guid):
+ self._drop_pages()
+ result = {}
+ for page in self._sorted_pages:
+ cached = page.get(guid)
+ if cached is not None:
+ result.update(cached.properties)
+ return result
+
+ def store(self, guid, properties, new, pre_cb=None, post_cb=None, *args):
+ if properties and new is not None:
+ if new:
+ orig = None
+ else:
+ orig = self.get_cached(guid)
+ try:
+ # XXX Avoid creating Storage every time
+ record = Storage(self._path, self.metadata).get(guid)
+ for prop in self._term_props.values():
+ if prop.name not in orig:
+ orig[prop.name] = record.get(prop.name)['value']
+ except env.NotFound:
+ pass
+ # Needs to be called before `index_queue.put()`
+ # to give it a chance to read original properties from the storage
+ page = self._pages.get(self._cache_seqno)
+ if page is None:
+ page = self._pages[self._cache_seqno] = \
+ _CachedPage(self._term_props)
+ _logger.debug('New cache page for %r with seqno %s to '
+ 'insert %r', self.metadata.name, self._cache_seqno,
+ guid)
+ page.update(guid, properties, orig)
+
+ self._put(IndexWriter.store, guid, properties, new, pre_cb, post_cb,
+ *args)
+
+ def delete(self, guid, post_cb=None, *args):
+ self._put(IndexWriter.delete, guid, post_cb, *args)
+
+ def find(self, query):
+ self._do_open(False)
+
+ if query.no_cache:
+ pages = []
+ else:
+ pages = self._sorted_pages
+
+ def next_page_find(query):
+ if pages:
+ return pages.pop().find(query, next_page_find)
+ elif self._db is None:
+ return [], 0
+ else:
+ return IndexReader.find(self, query)
+
+ return next_page_find(query)
+
+ @property
+ def _sorted_pages(self):
+ return [self._pages[i] for i in sorted(self._pages.keys())]
+
+ def _do_open(self, reset):
+ if reset:
+ self._dirty = True
+
+ if self._db is None:
+ if exists(self._path):
+ self._drop_pages()
+ else:
+ return
+ elif not self._drop_pages() and not self._dirty:
+ return
+
+ try:
+ if self._db is None:
+ self._db = xapian.Database(self._path)
+ _logger.debug('Opened %r RO index', self.metadata.name)
+ else:
+ self._db.reopen()
+ _logger.debug('Re-opened %r RO index', self.metadata.name)
+ self._dirty = False
+ except Exception:
+ util.exception(_logger,
+ 'Cannot open %r RO index', self.metadata.name)
+ self._db = None
+
+ def _drop_pages(self):
+ commit_seqno = index_queue.commit_seqno(self.metadata.name)
+ if commit_seqno == self._commit_seqno:
+ return False
+ for seqno in self._pages.keys():
+ if seqno <= commit_seqno:
+ del self._pages[seqno]
+ _logger.debug('Drop cache page for %r with seqno %s',
+ self.metadata.name, seqno)
+ self._commit_seqno = commit_seqno
+ self._dirty = True
+ return True
+
+ def _put(self, op, *args):
+ _logger.debug('Push %r(%r) to %rs queue',
+ op, args, self.metadata.name)
+ new_cache_seqno = index_queue.put(self.metadata.name, op, *args)
+ if new_cache_seqno != self._cache_seqno:
+ self._cache_seqno = new_cache_seqno
+ self._pages[new_cache_seqno] = _CachedPage(self._term_props)
+ _logger.debug('New cache page for %r with seqno %s',
+ self.metadata.name, new_cache_seqno)
+
+
+class _CachedPage(dict):
+
+ def __init__(self, term_props):
+ self._term_props = term_props
+
+ def update(self, guid, props, orig):
+ existing = self.get(guid)
+ if existing is None:
+ self[guid] = _CachedDocument(self._term_props, guid, props, orig)
+ else:
+ existing.update(props)
+
+ def find(self, query, direct_find):
+ if 'guid' in query.request:
+ documents, total = direct_find(query)
+ cache = self.get(query.request['guid'])
+ if cache is None:
+ return documents, total
+
+ def patched_guid_find():
+ processed = False
+ for guid, props in documents:
+ processed = True
+ props.update(cache.properties)
+ yield guid, props
+ if not processed:
+ yield cache.guid, cache.properties
+
+ return patched_guid_find(), total
+
+ if not self:
+ return direct_find(query)
+
+ adds, deletes, updates = self._patch_find(query.request)
+ if not adds and not deletes and not updates:
+ return direct_find(query)
+
+ orig_limit = query.limit
+ query.limit += len(deletes)
+ documents, total = direct_find(query)
+ total.value += len(adds)
+
+ def patched_find(orig_limit):
+ for guid, props in documents:
+ if orig_limit < 1:
+ break
+ if guid in deletes:
+ total.value -= 1
+ continue
+ cache = updates.get(guid)
+ if cache is not None:
+ props.update(cache.properties)
+ yield guid, props
+ orig_limit -= 1
+
+ for doc in adds:
+ if orig_limit < 1:
+ break
+ yield doc.guid, doc.properties
+ orig_limit -= 1
+
+ return patched_find(orig_limit), total
+
+ def _patch_find(self, request):
+ adds = []
+ deletes = set()
+ updates = {}
+
+ terms = set()
+ for prop_name, value in request.items():
+ prop = self._term_props.get(prop_name)
+ if prop is None:
+ continue
+ try:
+ value = prop.decode(value)
+ except ValueError, error:
+ _logger.debug('Wrong request property value %r for %r '
+ 'property, thus the whole request is empty: %s',
+ value, prop_name, error)
+ return None, None, None
+ terms.add(_TermValue(prop, value))
+
+ for cache in self.values():
+ if cache.new:
+ if terms.issubset(cache.terms):
+ bisect.insort(adds, cache)
+ else:
+ if terms:
+ if terms.issubset(cache.terms):
+ if not terms.issubset(cache.orig_terms):
+ bisect.insort(adds, cache)
+ continue
+ else:
+ if terms.issubset(cache.orig_terms):
+ deletes.add(cache.guid)
+ continue
+ updates[cache.guid] = cache
+
+ return adds, deletes, updates
+
+
+class _CachedDocument(object):
+
+ def __init__(self, term_props, guid, properties, orig):
+ self.guid = guid
+ self.properties = properties.copy()
+ self.new = orig is None
+ self.terms = set()
+ self.orig_terms = set()
+ self._term_props = term_props
+
+ for prop in term_props.values():
+ if orig is not None:
+ self.orig_terms.add(_TermValue(prop, orig.get(prop.name)))
+
+ self._update_terms()
+
+ def __sort__(self, other):
+ return cmp(self.guid, other.guid)
+
+ def update(self, properties):
+ self.properties.update(properties)
+ self._update_terms()
+
+ def _update_terms(self):
+ self.terms.clear()
+ orig_terms = {}
+ for i in self.orig_terms:
+ orig_terms[i.prop] = i.value
+ for prop in self._term_props.values():
+ term = self.properties.get(prop.name, orig_terms.get(prop))
+ self.terms.add(_TermValue(prop, term))
+
+
+class _TermValue:
+
+ def __init__(self, prop, value):
+ self.prop = prop
+ self.value = value
+
+ def __repr__(self):
+ return '%s=%s' % (self.prop.name, self.value)
+
+ def __cmp__(self, other):
+ result = cmp(self.prop.name, other.prop.name)
+ if result:
+ return result
+ if not self.prop.composite:
+ return cmp(self.value, other.value)
+ self_value = set(self.value)
+ other_value = set(other.value)
+ if self_value and self_value.issubset(other_value) or \
+ other_value and other_value.issubset(self_value):
+ return 0
+ else:
+ return cmp(self.value, other.value)
+
+ def __hash__(self):
+ return hash(self.prop.name)