# # Author: Sascha Silbe # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Gdatastore metadata index interface """ import logging import os import sys import time import xapian from xapian import Document, Enquire, Query, WritableDatabase _CURRENT_VERSION = 1 _STANDARD_TERMS = { 'activity': {'prefix': 'Xactname', 'type': str}, 'activity_id': {'prefix': 'Xactid', 'type': str}, 'description': {'prefix': 'Xdesc', 'type': unicode}, 'keep': {'prefix': 'Xkeep', 'type': str}, 'mime_type': {'prefix': 'T', 'type': str}, 'tags': {'prefix': 'K', 'type': unicode}, 'title': {'prefix': 'S', 'type': unicode}, 'tree_id': {'prefix': 'Xtree', 'type': str}, 'version_id': {'prefix': 'Xversion', 'type': str}, } _VALUE_TREE_ID = 0 _VALUE_VERSION_ID = 1 _VALUE_MTIME = 2 _VALUE_SIZE = 3 _VALUE_CTIME = 4 _STANDARD_VALUES = { 'creation_time': {'number': _VALUE_CTIME, 'type': float}, 'filesize': {'number': _VALUE_SIZE, 'type': int}, 'timestamp': {'number': _VALUE_MTIME, 'type': float}, 'tree_id': {'number': _VALUE_TREE_ID, 'type': str}, 'version_id': {'number': _VALUE_VERSION_ID, 'type': str}, } _IGNORE_PROPERTIES = ['preview'] _PREFIX_FULL_VALUE = 'Xf' _PREFIX_OBJECT_ID = 'Q' _LIMIT_MAX = 2 ** 31 - 1 _DOCID_REVERSE_MAP = {True: Enquire.DESCENDING, False: Enquire.ASCENDING} class DSIndexError(Exception): pass class TermGenerator(xapian.TermGenerator): def __init__(self): self._document = None xapian.TermGenerator.__init__(self) def index_document(self, document, properties): for name, info in _STANDARD_VALUES.items(): if name not in properties: continue document.add_value(info['number'], _serialise_value(info, properties[name])) self._document = document self.set_document(document) properties = dict(properties) self._index_known(properties) self._index_unknown(properties) def _index_known(self, properties): """Index standard properties and remove them from the input.""" for name, info in _STANDARD_TERMS.items(): if name not in properties: continue value = info['type'](properties.pop(name)) self._index_property(value, info['prefix']) def _index_unknown(self, properties): """ Index all given properties. Expects not to get passed any standard term-stored property. """ for name, value in properties.items(): if name in _IGNORE_PROPERTIES or name in _STANDARD_VALUES: continue self._index_property(value, _prefix_for_unknown(name)) def _index_property(self, value, prefix): if isinstance(value, unicode): value = value.encode('utf-8') elif not isinstance(value, str): value = str(value) # Hardcoded Xapian term length limit if len(prefix + value) < 240: # We need to add the full value (i.e. not split into words), too, # so we can enumerate unique values. It also simplifies setting up # dictionary-based queries. self._document.add_term(_PREFIX_FULL_VALUE + prefix + value) self.index_text(value, 1, prefix) self.increase_termpos() class QueryParser(xapian.QueryParser): """ QueryParser that understands dictionaries and Xapian query strings. The dictionary may contain property names as keys and basic types (exact match), 2-tuples (range, only valid for value-stored standard properties) and lists (multiple exact matches joined with OR) as values. An empty dictionary matches everything. Queries from different keys (i.e. different property names) are joined with AND. Full text search (Xapian query string) is only supported for standard properties. """ _FLAGS = (xapian.QueryParser.FLAG_PHRASE | xapian.QueryParser.FLAG_BOOLEAN | xapian.QueryParser.FLAG_LOVEHATE | xapian.QueryParser.FLAG_WILDCARD) def __init__(self): xapian.QueryParser.__init__(self) for name, info in _STANDARD_TERMS.items(): self.add_prefix(name, info['prefix']) self.add_prefix('', info['prefix']) def _parse_query_term(self, prefix, value): if isinstance(value, list): subqueries = [self._parse_query_term(prefix, word) for word in value] return Query(Query.OP_OR, subqueries) return Query(_PREFIX_FULL_VALUE + prefix + str(value)) def _parse_query_value_range(self, info, value): if len(value) != 2: raise TypeError('Only tuples of size 2 have a defined meaning.' ' Did you mean to pass a list instead?') start, end = value return Query(Query.OP_VALUE_RANGE, info['number'], _serialise_value(info, start), _serialise_value(info, end)) def _parse_query_value(self, info, value): if isinstance(value, list): subqueries = [self._parse_query_value(info, word) for word in value] return Query(Query.OP_OR, subqueries) elif isinstance(value, tuple): return self._parse_query_value_range(info, value) elif isinstance(value, dict): # compatibility option for timestamp: {'start': 0, 'end': 1} start = value.get('start', 0) end = value.get('end', sys.maxint) return self._parse_query_value_range(info, (start, end)) else: return self._parse_query_value_range(info, (value, value)) def _parse_query_xapian(self, query_str): return xapian.QueryParser.parse_query(self, query_str, QueryParser._FLAGS, '') def parse_datastore_query(self, query_dict, query_string): logging.debug('query_dict=%r, query_string=%r', query_dict, query_string) queries = [] query_dict = dict(query_dict or {}) if query_string is not None: queries.append(self._parse_query_xapian(str(query_string))) for name, value in query_dict.items(): if name in _STANDARD_TERMS: prefix = _STANDARD_TERMS[name]['prefix'] query = self._parse_query_term(prefix, value) elif name in _STANDARD_VALUES: info = _STANDARD_VALUES[name] query = self._parse_query_value(info, value) else: logging.warning('Unknown term: %r=%r', name, value) continue queries.append(query) if not queries: queries.append(Query('')) logging.debug('queries: %r', [str(query) for query in queries]) return Query(Query.OP_AND, queries) class Index(object): def __init__(self, base_dir): self._base_dir = base_dir self._database = None if not os.path.exists(self._base_dir): os.makedirs(self._base_dir) self._create_database() self._migrate() self._query_parser = QueryParser() self._query_parser.set_database(self._database) def close(self): """Close index database if it is open.""" if not self._database: return self._database.close() self._database = None def contains(self, object_id): postings = self._database.postlist(_object_id_term(object_id)) try: _ = postings.next() except StopIteration: return False return True def delete(self, object_id): object_id_term = _object_id_term(object_id) if __debug__: enquire = Enquire(self._database) enquire.set_query(Query(object_id_term)) documents = [hit.document for hit in enquire.get_mset(0, 2, 2)] assert len(documents) == 1 self._database.delete_document(object_id_term) def find(self, query_dict, query_string, options): offset = options.pop('offset', 0) limit = options.pop('limit', _LIMIT_MAX) order_by = options.pop('order_by', ['+timestamp'])[0] all_versions = options.pop('all_versions', False) check_at_least = options.pop('check_at_least', offset + limit + 1) enquire = Enquire(self._database) query = self._query_parser.parse_datastore_query(query_dict, query_string) enquire.set_query(query) sort_reverse = {'+': True, '-': False}[order_by[0]] try: sort_value_nr = _STANDARD_VALUES[order_by[1:]]['number'] except KeyError: logging.warning('Trying to order by unknown property: %r', order_by[1:]) sort_value_nr = _VALUE_MTIME enquire.set_sort_by_value(sort_value_nr, reverse=sort_reverse) enquire.set_docid_order(_DOCID_REVERSE_MAP[sort_reverse]) if not all_versions: enquire.set_collapse_key(_VALUE_TREE_ID) if not all_versions and order_by != '+timestamp': # Xapian doesn't support using a different sort order while # collapsing (which needs to be timestamp in our case), so # we need to query everything and sort+limit ourselves. enquire.set_sort_by_value(_VALUE_MTIME, True) enquire.set_docid_order(enquire.ASCENDING) query_result = enquire.get_mset(0, _LIMIT_MAX, _LIMIT_MAX) else: logging.debug('Offset/limit using Xapian: %d %d %d', offset, limit, check_at_least) query_result = enquire.get_mset(offset, limit, check_at_least) total_count = query_result.get_matches_lower_bound() documents = [hit.document for hit in query_result] if (not all_versions) and (order_by != '+timestamp'): _sort_documents(documents, sort_value_nr, sort_reverse) del documents[offset + limit:] #object_ids = [(document.get_value(_VALUE_TREE_ID), # document.get_value(_VALUE_VERSION_ID)) # for document in documents] entries = [deserialise_metadata(document.get_data()) for document in documents] return entries, total_count def find_unique_values(self, property): if property in _STANDARD_TERMS: prefix = _PREFIX_FULL_VALUE + _STANDARD_TERMS[property]['prefix'] else: prefix = _PREFIX_FULL_VALUE + _prefix_for_unknown(property) return [term.term[len(prefix):] for term in self._database.allterms(prefix)] def retrieve(self, object_id): postings = self._database.postlist(_object_id_term(object_id)) document = self._database.get_document(postings.next().docid) return deserialise_metadata(document.get_data()) def store(self, object_id, properties): logging.debug('store(%r, %r)', object_id, properties) assert (properties['tree_id'], properties['version_id']) == object_id id_term = _object_id_term(object_id) document = Document() logging.debug('serialised=%r', serialiase_metadata(properties)) document.set_data(serialiase_metadata(properties)) document.add_term(id_term) term_generator = TermGenerator() term_generator.index_document(document, properties) assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id self._database.replace_document(id_term, document) def _create_database(self): database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN) database.set_metadata('gdatastore_version', str(_CURRENT_VERSION)) database.close() def _migrate(self): for try_count in range(10): try: self._database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN) except xapian.DatabaseLockError: logging.error("Couldn't lock Xapian database (try #%d)", try_count) time.sleep(1) else: break version = int(self._database.get_metadata('gdatastore_version')) if version > _CURRENT_VERSION: raise DSIndexError('Unsupported index version: %d > %d' % (version, _CURRENT_VERSION)) def deserialise_metadata(serialised): """Deserialise a string generated by serialise_metadata(). Do NOT pass any value that might have been modified since it was generated by serialiase_metadata(). """ return eval(serialised) def serialiase_metadata(metadata): return repr(_to_native(metadata)) def _object_id_term(object_id): return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id def _prefix_for_unknown(name): return 'Xu%d:%s' % (len(name), unicode(name).encode('utf-8')) def _serialise_value(info, value): if info['type'] in (float, int, long): return xapian.sortable_serialise(info['type'](value)) elif info['type'] == unicode: return unicode(value).encode('utf-8') return str(info['type'](value)) def _sort_documents(documents, sort_value_nr, sort_reverse): def _cmp(document_a, document_b): value_a = document_a.get_value(sort_value_nr) value_b = document_b.get_value(sort_value_nr) if value_a < value_b: return -1 elif value_a > value_b: return 1 docid_a = document_a.get_docid() docid_b = document_b.get_docid() if docid_a < docid_b: return -1 elif docid_a > docid_b: return 1 return 0 documents.sort(cmp=_cmp, reverse=sort_reverse) def _to_native(value): if isinstance(value, list): return [_to_native(e) for e in value] elif isinstance(value, dict): return dict([(_to_native(k), _to_native(v)) for k, v in value.items()]) elif isinstance(value, unicode): return unicode(value) elif isinstance(value, str): return str(value) elif isinstance(value, int): return int(value) elif isinstance(value, float): return float(value) return value