Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/gdatastore/index.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/gdatastore/index.py')
-rw-r--r--src/gdatastore/index.py394
1 files changed, 394 insertions, 0 deletions
diff --git a/src/gdatastore/index.py b/src/gdatastore/index.py
new file mode 100644
index 0000000..61e18da
--- /dev/null
+++ b/src/gdatastore/index.py
@@ -0,0 +1,394 @@
+#
+# Author: Sascha Silbe <sascha-pgp@silbe.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Gdatastore metadata index interface
+"""
+
+import logging
+import os
+import sys
+
+import xapian
+from xapian import Document, Enquire, Query, WritableDatabase
+
+
+_CURRENT_VERSION = 1
+_STANDARD_TERMS = {
+ 'activity': {'prefix': 'Xactname', 'type': str},
+ 'activity_id': {'prefix': 'Xactid', 'type': str},
+ 'keep': {'prefix': 'Xkeep', 'type': str},
+ 'mime_type': {'prefix': 'T', 'type': str},
+ 'tags': {'prefix': 'K', 'type': unicode},
+ 'title': {'prefix': 'S', 'type': unicode},
+ 'tree_id': {'prefix': 'Xtree', 'type': str},
+ 'version_id': {'prefix': 'Xversion', 'type': str},
+}
+_VALUE_TREE_ID = 0
+_VALUE_VERSION_ID = 1
+_VALUE_MTIME = 2
+_VALUE_SIZE = 3
+_VALUE_CTIME = 4
+_STANDARD_VALUES = {
+ 'creation_time': {'number': _VALUE_CTIME, 'type': float},
+ 'filesize': {'number': _VALUE_SIZE, 'type': int},
+ 'timestamp': {'number': _VALUE_MTIME, 'type': float},
+ 'tree_id': {'number': _VALUE_TREE_ID, 'type': str},
+ 'version_id': {'number': _VALUE_VERSION_ID, 'type': str},
+}
+_IGNORE_PROPERTIES = ['preview']
+_PREFIX_FULL_VALUE = 'Xf'
+_PREFIX_OBJECT_ID = 'Q'
+_LIMIT_MAX = 2 ** 31 - 1
+_DOCID_REVERSE_MAP = {True: Enquire.DESCENDING, False: Enquire.ASCENDING}
+
+
+class DSIndexError(Exception):
+ pass
+
+
+class TermGenerator(xapian.TermGenerator):
+
+ def __init__(self):
+ self._document = None
+ xapian.TermGenerator.__init__(self)
+
+ def index_document(self, document, properties):
+ for name, info in _STANDARD_VALUES.items():
+ if name not in properties:
+ continue
+
+ document.add_value(info['number'],
+ _serialise_value(info, properties[name]))
+
+ self._document = document
+ self.set_document(document)
+
+ properties = dict(properties)
+ self._index_known(properties)
+ self._index_unknown(properties)
+
+ def _index_known(self, properties):
+ """Index standard properties and remove them from the input."""
+ for name, info in _STANDARD_TERMS.items():
+ if name not in properties:
+ continue
+
+ self._index_property(properties.pop(name), info['prefix'])
+
+ def _index_unknown(self, properties):
+ """
+ Index all given properties.
+
+ Expects not to get passed any standard term-stored property.
+ """
+ for name, value in properties.items():
+ if name in _IGNORE_PROPERTIES or name in _STANDARD_VALUES:
+ continue
+
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ elif not isinstance(value, str):
+ value = str(value)
+
+ self._index_property(_prefix_for_unknown(name), value)
+
+ def _index_property(self, value, prefix):
+ # We need to add the full value (i.e. not split into words), too, so
+ # we can enumerate unique values. It also simplifies setting up
+ # dictionary-based queries.
+ self._document.add_term(_PREFIX_FULL_VALUE + prefix + value)
+ self.index_text(value, 1, prefix)
+ self.increase_termpos()
+
+
+class QueryParser(xapian.QueryParser):
+ """
+ QueryParser that understands dictionaries and Xapian query strings.
+
+ The dictionary may contain property names as keys and basic types
+ (exact match), 2-tuples (range, only valid for value-stored
+ standard properties) and lists (multiple exact matches joined with
+ OR) as values.
+ An empty dictionary matches everything. Queries from different keys
+ (i.e. different property names) are joined with AND.
+
+ Full text search (Xapian query string) is only supported for standard
+ properties.
+ """
+
+ _FLAGS = (xapian.QueryParser.FLAG_PHRASE |
+ xapian.QueryParser.FLAG_BOOLEAN |
+ xapian.QueryParser.FLAG_LOVEHATE |
+ xapian.QueryParser.FLAG_WILDCARD)
+
+ def __init__(self):
+ xapian.QueryParser.__init__(self)
+ for name, info in _STANDARD_TERMS.items():
+ self.add_prefix(name, info['prefix'])
+ self.add_prefix('', info['prefix'])
+
+ def _parse_query_term(self, prefix, value):
+ if isinstance(value, list):
+ subqueries = [self._parse_query_term(prefix, word)
+ for word in value]
+ return Query(Query.OP_OR, subqueries)
+
+ return Query(_PREFIX_FULL_VALUE + prefix + str(value))
+
+ def _parse_query_value_range(self, info, value):
+ if len(value) != 2:
+ raise TypeError('Only tuples of size 2 have a defined meaning.'
+ ' Did you mean to pass a list instead?')
+
+ start, end = value
+ return Query(Query.OP_VALUE_RANGE, info['number'],
+ _serialise_value(info, start),
+ _serialise_value(info, end))
+
+ def _parse_query_value(self, info, value):
+ if isinstance(value, list):
+ subqueries = [self._parse_query_value(info, word)
+ for word in value]
+ return Query(Query.OP_OR, subqueries)
+
+ elif isinstance(value, tuple):
+ return self._parse_query_value_range(info, value)
+
+ elif isinstance(value, dict):
+ # compatibility option for timestamp: {'start': 0, 'end': 1}
+ start = value.get('start', 0)
+ end = value.get('end', sys.maxint)
+ return self._parse_query_value_range(info, (start, end))
+
+ else:
+ return self._parse_query_value_range(info, (value, value))
+
+ def _parse_query_xapian(self, query_str):
+ return xapian.QueryParser.parse_query(self, query_str,
+ QueryParser._FLAGS, '')
+
+ def parse_datastore_query(self, query_dict, query_string):
+ logging.debug('query_dict=%r, query_string=%r', query_dict,
+ query_string)
+ queries = []
+ query_dict = dict(query_dict or {})
+
+ if query_string is not None:
+ queries.append(self._parse_query_xapian(str(query_string)))
+
+ for name, value in query_dict.items():
+ if name in _STANDARD_TERMS:
+ prefix = _STANDARD_TERMS[name]['prefix']
+ query = self._parse_query_term(prefix, value)
+ elif name in _STANDARD_VALUES:
+ info = _STANDARD_VALUES[name]
+ query = self._parse_query_value(info, value)
+ else:
+ logging.warning('Unknown term: %r=%r', name, value)
+ continue
+
+ queries.append(query)
+
+ if not queries:
+ queries.append(Query(''))
+
+ logging.debug('queries: %r', [str(query) for query in queries])
+ return Query(Query.OP_AND, queries)
+
+
+class Index(object):
+
+ def __init__(self, base_dir):
+ self._base_dir = base_dir
+ self._database = None
+
+ if not os.path.exists(self._base_dir):
+ os.makedirs(self._base_dir)
+ self._create_database()
+
+ self._migrate()
+ self._query_parser = QueryParser()
+ self._query_parser.set_database(self._database)
+
+ def close(self):
+ """Close index database if it is open."""
+ if not self._database:
+ return
+
+ self._database.close()
+ self._database = None
+
+ def contains(self, object_id):
+ postings = self._database.postlist(_object_id_term(object_id))
+ try:
+ _ = postings.next()
+ except StopIteration:
+ return False
+ return True
+
+ def delete(self, object_id):
+ object_id_term = _object_id_term(object_id)
+ if __debug__:
+ enquire = Enquire(self._database)
+ enquire.set_query(Query(object_id_term))
+ documents = [hit.document for hit in enquire.get_mset(0, 2, 2)]
+ assert len(documents) == 1
+
+ self._database.delete_document(object_id_term)
+
+ def find(self, query_dict, query_string, options):
+ offset = options.pop('offset', 0)
+ limit = options.pop('limit', _LIMIT_MAX)
+ order_by = options.pop('order_by', ['+timestamp'])[0]
+ all_versions = options.pop('all_versions', False)
+ check_at_least = options.pop('check_at_least', offset + limit + 1)
+
+ enquire = Enquire(self._database)
+ query = self._query_parser.parse_datastore_query(query_dict,
+ query_string)
+ enquire.set_query(query)
+
+ sort_reverse = {'+': True, '-': False}[order_by[0]]
+ try:
+ sort_value_nr = _STANDARD_VALUES[order_by[1:]]['number']
+ except KeyError:
+ logging.warning('Trying to order by unknown property: %r',
+ order_by[1:])
+ sort_value_nr = _VALUE_MTIME
+
+ enquire.set_sort_by_value(sort_value_nr, reverse=sort_reverse)
+ enquire.set_docid_order(_DOCID_REVERSE_MAP[sort_reverse])
+
+ if not all_versions:
+ enquire.set_collapse_key(_VALUE_TREE_ID)
+
+ if not all_versions and order_by != '+timestamp':
+ # Xapian doesn't support using a different sort order while
+ # collapsing (which needs to be timestamp in our case), so
+ # we need to query everything and sort+limit ourselves.
+ enquire.set_sort_by_value(_VALUE_MTIME, True)
+ enquire.set_docid_order(enquire.ASCENDING)
+ query_result = enquire.get_mset(0, _LIMIT_MAX, _LIMIT_MAX)
+ else:
+ logging.debug('Offset/limit using Xapian: %d %d %d', offset, limit, check_at_least)
+ query_result = enquire.get_mset(offset, limit, check_at_least)
+
+ total_count = query_result.get_matches_lower_bound()
+ documents = [hit.document for hit in query_result]
+
+ if (not all_versions) and (order_by != '+timestamp'):
+ _sort_documents(documents, sort_value_nr, sort_reverse)
+ del documents[offset + limit:]
+
+ #object_ids = [(document.get_value(_VALUE_TREE_ID),
+ # document.get_value(_VALUE_VERSION_ID))
+ # for document in documents]
+ entries = [deserialise_metadata(document.get_data())
+ for document in documents]
+
+ return entries, total_count
+
+ def store(self, object_id, properties):
+ logging.debug('store(%r, %r)', object_id, properties)
+ assert (properties['tree_id'], properties['version_id']) == object_id
+ id_term = _object_id_term(object_id)
+ document = Document()
+ logging.debug('serialised=%r', serialiase_metadata(properties))
+ document.set_data(serialiase_metadata(properties))
+ document.add_term(id_term)
+ term_generator = TermGenerator()
+ term_generator.index_document(document, properties)
+ assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id
+ self._database.replace_document(id_term, document)
+
+ def _create_database(self):
+ database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN)
+ database.set_metadata('gdatastore_version', str(_CURRENT_VERSION))
+ database.close()
+
+ def _migrate(self):
+ self._database = WritableDatabase(self._base_dir,
+ xapian.DB_CREATE_OR_OPEN)
+ version = int(self._database.get_metadata('gdatastore_version'))
+
+ if version > _CURRENT_VERSION:
+ raise DSIndexError('Unsupported index version: %d > %d' %
+ (version, _CURRENT_VERSION))
+
+
+def deserialise_metadata(serialised):
+ """Deserialise a string generated by serialise_metadata().
+
+ Do NOT pass any value that might have been modified since it was generated
+ by serialiase_metadata().
+ """
+ return eval(serialised)
+
+
+def serialiase_metadata(metadata):
+ return repr(_to_native(metadata))
+
+
+def _object_id_term(object_id):
+ return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id
+
+
+def _prefix_for_unknown(name):
+ return 'Xu%d:%s' % (len(name), unicode(name).encode('utf-8'))
+
+
+def _serialise_value(info, value):
+ if info['type'] in (float, int, long):
+ return xapian.sortable_serialise(info['type'](value))
+ elif info['type'] == unicode:
+ return unicode(value).encode('utf-8')
+
+ return str(info['type'](value))
+
+
+def _sort_documents(documents, sort_value_nr, sort_reverse):
+ def _cmp(document_a, document_b):
+ value_a = document_a.get_value(sort_value_nr)
+ value_b = document_b.get_value(sort_value_nr)
+ if value_a < value_b:
+ return -1
+ elif value_a > value_b:
+ return 1
+
+ docid_a = document_a.get_docid()
+ docid_b = document_b.get_docid()
+ if docid_a < docid_b:
+ return -1
+ elif docid_a > docid_b:
+ return 1
+ return 0
+
+ documents.sort(cmp=_cmp, reverse=sort_reverse)
+
+
+def _to_native(value):
+ if isinstance(value, list):
+ return [_to_native(e) for e in value]
+ elif isinstance(value, dict):
+ return dict([(_to_native(k), _to_native(v)) for k, v in value.items()])
+ elif isinstance(value, unicode):
+ return unicode(value)
+ elif isinstance(value, str):
+ return str(value)
+ elif isinstance(value, int):
+ return int(value)
+ elif isinstance(value, float):
+ return float(value)
+ return value