diff options
Diffstat (limited to 'src/gdatastore/index.py')
-rw-r--r-- | src/gdatastore/index.py | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/src/gdatastore/index.py b/src/gdatastore/index.py new file mode 100644 index 0000000..61e18da --- /dev/null +++ b/src/gdatastore/index.py @@ -0,0 +1,394 @@ +# +# Author: Sascha Silbe <sascha-pgp@silbe.org> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Gdatastore metadata index interface +""" + +import logging +import os +import sys + +import xapian +from xapian import Document, Enquire, Query, WritableDatabase + + +_CURRENT_VERSION = 1 +_STANDARD_TERMS = { + 'activity': {'prefix': 'Xactname', 'type': str}, + 'activity_id': {'prefix': 'Xactid', 'type': str}, + 'keep': {'prefix': 'Xkeep', 'type': str}, + 'mime_type': {'prefix': 'T', 'type': str}, + 'tags': {'prefix': 'K', 'type': unicode}, + 'title': {'prefix': 'S', 'type': unicode}, + 'tree_id': {'prefix': 'Xtree', 'type': str}, + 'version_id': {'prefix': 'Xversion', 'type': str}, +} +_VALUE_TREE_ID = 0 +_VALUE_VERSION_ID = 1 +_VALUE_MTIME = 2 +_VALUE_SIZE = 3 +_VALUE_CTIME = 4 +_STANDARD_VALUES = { + 'creation_time': {'number': _VALUE_CTIME, 'type': float}, + 'filesize': {'number': _VALUE_SIZE, 'type': int}, + 'timestamp': {'number': _VALUE_MTIME, 'type': float}, + 'tree_id': {'number': _VALUE_TREE_ID, 'type': str}, + 'version_id': {'number': _VALUE_VERSION_ID, 'type': str}, +} +_IGNORE_PROPERTIES = ['preview'] +_PREFIX_FULL_VALUE = 'Xf' +_PREFIX_OBJECT_ID = 'Q' +_LIMIT_MAX = 2 ** 31 - 1 +_DOCID_REVERSE_MAP = {True: Enquire.DESCENDING, False: Enquire.ASCENDING} + + +class DSIndexError(Exception): + pass + + +class TermGenerator(xapian.TermGenerator): + + def __init__(self): + self._document = None + xapian.TermGenerator.__init__(self) + + def index_document(self, document, properties): + for name, info in _STANDARD_VALUES.items(): + if name not in properties: + continue + + document.add_value(info['number'], + _serialise_value(info, properties[name])) + + self._document = document + self.set_document(document) + + properties = dict(properties) + self._index_known(properties) + self._index_unknown(properties) + + def _index_known(self, properties): + """Index standard properties and remove them from the input.""" + for name, info in _STANDARD_TERMS.items(): + if name not in properties: + continue + + self._index_property(properties.pop(name), info['prefix']) + + def _index_unknown(self, properties): + """ + Index all given properties. + + Expects not to get passed any standard term-stored property. + """ + for name, value in properties.items(): + if name in _IGNORE_PROPERTIES or name in _STANDARD_VALUES: + continue + + if isinstance(value, unicode): + value = value.encode('utf-8') + elif not isinstance(value, str): + value = str(value) + + self._index_property(_prefix_for_unknown(name), value) + + def _index_property(self, value, prefix): + # We need to add the full value (i.e. not split into words), too, so + # we can enumerate unique values. It also simplifies setting up + # dictionary-based queries. + self._document.add_term(_PREFIX_FULL_VALUE + prefix + value) + self.index_text(value, 1, prefix) + self.increase_termpos() + + +class QueryParser(xapian.QueryParser): + """ + QueryParser that understands dictionaries and Xapian query strings. + + The dictionary may contain property names as keys and basic types + (exact match), 2-tuples (range, only valid for value-stored + standard properties) and lists (multiple exact matches joined with + OR) as values. + An empty dictionary matches everything. Queries from different keys + (i.e. different property names) are joined with AND. + + Full text search (Xapian query string) is only supported for standard + properties. + """ + + _FLAGS = (xapian.QueryParser.FLAG_PHRASE | + xapian.QueryParser.FLAG_BOOLEAN | + xapian.QueryParser.FLAG_LOVEHATE | + xapian.QueryParser.FLAG_WILDCARD) + + def __init__(self): + xapian.QueryParser.__init__(self) + for name, info in _STANDARD_TERMS.items(): + self.add_prefix(name, info['prefix']) + self.add_prefix('', info['prefix']) + + def _parse_query_term(self, prefix, value): + if isinstance(value, list): + subqueries = [self._parse_query_term(prefix, word) + for word in value] + return Query(Query.OP_OR, subqueries) + + return Query(_PREFIX_FULL_VALUE + prefix + str(value)) + + def _parse_query_value_range(self, info, value): + if len(value) != 2: + raise TypeError('Only tuples of size 2 have a defined meaning.' + ' Did you mean to pass a list instead?') + + start, end = value + return Query(Query.OP_VALUE_RANGE, info['number'], + _serialise_value(info, start), + _serialise_value(info, end)) + + def _parse_query_value(self, info, value): + if isinstance(value, list): + subqueries = [self._parse_query_value(info, word) + for word in value] + return Query(Query.OP_OR, subqueries) + + elif isinstance(value, tuple): + return self._parse_query_value_range(info, value) + + elif isinstance(value, dict): + # compatibility option for timestamp: {'start': 0, 'end': 1} + start = value.get('start', 0) + end = value.get('end', sys.maxint) + return self._parse_query_value_range(info, (start, end)) + + else: + return self._parse_query_value_range(info, (value, value)) + + def _parse_query_xapian(self, query_str): + return xapian.QueryParser.parse_query(self, query_str, + QueryParser._FLAGS, '') + + def parse_datastore_query(self, query_dict, query_string): + logging.debug('query_dict=%r, query_string=%r', query_dict, + query_string) + queries = [] + query_dict = dict(query_dict or {}) + + if query_string is not None: + queries.append(self._parse_query_xapian(str(query_string))) + + for name, value in query_dict.items(): + if name in _STANDARD_TERMS: + prefix = _STANDARD_TERMS[name]['prefix'] + query = self._parse_query_term(prefix, value) + elif name in _STANDARD_VALUES: + info = _STANDARD_VALUES[name] + query = self._parse_query_value(info, value) + else: + logging.warning('Unknown term: %r=%r', name, value) + continue + + queries.append(query) + + if not queries: + queries.append(Query('')) + + logging.debug('queries: %r', [str(query) for query in queries]) + return Query(Query.OP_AND, queries) + + +class Index(object): + + def __init__(self, base_dir): + self._base_dir = base_dir + self._database = None + + if not os.path.exists(self._base_dir): + os.makedirs(self._base_dir) + self._create_database() + + self._migrate() + self._query_parser = QueryParser() + self._query_parser.set_database(self._database) + + def close(self): + """Close index database if it is open.""" + if not self._database: + return + + self._database.close() + self._database = None + + def contains(self, object_id): + postings = self._database.postlist(_object_id_term(object_id)) + try: + _ = postings.next() + except StopIteration: + return False + return True + + def delete(self, object_id): + object_id_term = _object_id_term(object_id) + if __debug__: + enquire = Enquire(self._database) + enquire.set_query(Query(object_id_term)) + documents = [hit.document for hit in enquire.get_mset(0, 2, 2)] + assert len(documents) == 1 + + self._database.delete_document(object_id_term) + + def find(self, query_dict, query_string, options): + offset = options.pop('offset', 0) + limit = options.pop('limit', _LIMIT_MAX) + order_by = options.pop('order_by', ['+timestamp'])[0] + all_versions = options.pop('all_versions', False) + check_at_least = options.pop('check_at_least', offset + limit + 1) + + enquire = Enquire(self._database) + query = self._query_parser.parse_datastore_query(query_dict, + query_string) + enquire.set_query(query) + + sort_reverse = {'+': True, '-': False}[order_by[0]] + try: + sort_value_nr = _STANDARD_VALUES[order_by[1:]]['number'] + except KeyError: + logging.warning('Trying to order by unknown property: %r', + order_by[1:]) + sort_value_nr = _VALUE_MTIME + + enquire.set_sort_by_value(sort_value_nr, reverse=sort_reverse) + enquire.set_docid_order(_DOCID_REVERSE_MAP[sort_reverse]) + + if not all_versions: + enquire.set_collapse_key(_VALUE_TREE_ID) + + if not all_versions and order_by != '+timestamp': + # Xapian doesn't support using a different sort order while + # collapsing (which needs to be timestamp in our case), so + # we need to query everything and sort+limit ourselves. + enquire.set_sort_by_value(_VALUE_MTIME, True) + enquire.set_docid_order(enquire.ASCENDING) + query_result = enquire.get_mset(0, _LIMIT_MAX, _LIMIT_MAX) + else: + logging.debug('Offset/limit using Xapian: %d %d %d', offset, limit, check_at_least) + query_result = enquire.get_mset(offset, limit, check_at_least) + + total_count = query_result.get_matches_lower_bound() + documents = [hit.document for hit in query_result] + + if (not all_versions) and (order_by != '+timestamp'): + _sort_documents(documents, sort_value_nr, sort_reverse) + del documents[offset + limit:] + + #object_ids = [(document.get_value(_VALUE_TREE_ID), + # document.get_value(_VALUE_VERSION_ID)) + # for document in documents] + entries = [deserialise_metadata(document.get_data()) + for document in documents] + + return entries, total_count + + def store(self, object_id, properties): + logging.debug('store(%r, %r)', object_id, properties) + assert (properties['tree_id'], properties['version_id']) == object_id + id_term = _object_id_term(object_id) + document = Document() + logging.debug('serialised=%r', serialiase_metadata(properties)) + document.set_data(serialiase_metadata(properties)) + document.add_term(id_term) + term_generator = TermGenerator() + term_generator.index_document(document, properties) + assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id + self._database.replace_document(id_term, document) + + def _create_database(self): + database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN) + database.set_metadata('gdatastore_version', str(_CURRENT_VERSION)) + database.close() + + def _migrate(self): + self._database = WritableDatabase(self._base_dir, + xapian.DB_CREATE_OR_OPEN) + version = int(self._database.get_metadata('gdatastore_version')) + + if version > _CURRENT_VERSION: + raise DSIndexError('Unsupported index version: %d > %d' % + (version, _CURRENT_VERSION)) + + +def deserialise_metadata(serialised): + """Deserialise a string generated by serialise_metadata(). + + Do NOT pass any value that might have been modified since it was generated + by serialiase_metadata(). + """ + return eval(serialised) + + +def serialiase_metadata(metadata): + return repr(_to_native(metadata)) + + +def _object_id_term(object_id): + return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id + + +def _prefix_for_unknown(name): + return 'Xu%d:%s' % (len(name), unicode(name).encode('utf-8')) + + +def _serialise_value(info, value): + if info['type'] in (float, int, long): + return xapian.sortable_serialise(info['type'](value)) + elif info['type'] == unicode: + return unicode(value).encode('utf-8') + + return str(info['type'](value)) + + +def _sort_documents(documents, sort_value_nr, sort_reverse): + def _cmp(document_a, document_b): + value_a = document_a.get_value(sort_value_nr) + value_b = document_b.get_value(sort_value_nr) + if value_a < value_b: + return -1 + elif value_a > value_b: + return 1 + + docid_a = document_a.get_docid() + docid_b = document_b.get_docid() + if docid_a < docid_b: + return -1 + elif docid_a > docid_b: + return 1 + return 0 + + documents.sort(cmp=_cmp, reverse=sort_reverse) + + +def _to_native(value): + if isinstance(value, list): + return [_to_native(e) for e in value] + elif isinstance(value, dict): + return dict([(_to_native(k), _to_native(v)) for k, v in value.items()]) + elif isinstance(value, unicode): + return unicode(value) + elif isinstance(value, str): + return str(value) + elif isinstance(value, int): + return int(value) + elif isinstance(value, float): + return float(value) + return value |