diff options
author | Christophe Gueret <christophe.gueret@gmail.com> | 2011-10-05 16:15:42 (GMT) |
---|---|---|
committer | Christophe Gueret <christophe.gueret@gmail.com> | 2011-10-05 16:15:42 (GMT) |
commit | ef2bc076a3764c29db6853715bb22a4168806309 (patch) | |
tree | 1d02d18eda35aa9824478cebdfc0ebe47cd67c72 /src/semanticstore/indexstore.py | |
parent | b44e149a4b5e8bd257633c1d45a3346a6fcff859 (diff) |
Started to better split the journal from the rest of the semanticxo
stack
Diffstat (limited to 'src/semanticstore/indexstore.py')
-rw-r--r-- | src/semanticstore/indexstore.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/src/semanticstore/indexstore.py b/src/semanticstore/indexstore.py new file mode 100644 index 0000000..e500fe4 --- /dev/null +++ b/src/semanticstore/indexstore.py @@ -0,0 +1,241 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import os +import gobject + + +from semanticstore import layoutmanager +from semanticstore.layoutmanager import MAX_QUERY_LIMIT +from semanticstore.metadatastore import OLPC_TERMS +from semanticxo.sparql import SPARQL + +_VALUE_UID = 0 +_VALUE_TIMESTAMP = 1 +_VALUE_TITLE = 2 +# 3 reserved for version support +_VALUE_FILESIZE = 4 +_VALUE_CREATION_TIME = 5 + +_PREFIX_NONE = 'N' +_PREFIX_FULL_VALUE = 'F' +_PREFIX_UID = 'Q' +_PREFIX_ACTIVITY = 'A' +_PREFIX_ACTIVITY_ID = 'I' +_PREFIX_MIME_TYPE = 'M' +_PREFIX_KEEP = 'K' + +# Force a flush every _n_ changes to the db +_FLUSH_THRESHOLD = 20 + +# Force a flush after _n_ seconds since the last change to the db +_FLUSH_TIMEOUT = 5 + +_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'preview'] + +_MAX_RESULTS = int(2 ** 31 - 1) + +_QUERY_TERMS = ['uid', 'activity', 'activity_id', 'mime_type', 'timestamp', 'filesize', 'creation_time'] + + +class Enquire(object): + def __init__(self): + self._graph_patterns = [] + pass + + def parse_query(self, query_dict, query_string): + logging.debug('[IDX] parse_query %r %r', query_dict, query_string) + patterns = [] + query_dict = dict(query_dict) + + # Full text search + if query_string is not None: + # TODO Do something with a regexp in title + pass + + # Properties based search + for name, value in query_dict.items(): + if name in _QUERY_TERMS: + pattern = "?entry <%s> \"%s\"." % (OLPC_TERMS[name], value) + patterns.append(pattern) + else: + logging.warning('Unknown term: %r=%r', name, value) + + # Get all the entries + if not patterns: + pattern = "?entry <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <%s>." % OLPC_TERMS['DSObject'] + patterns.append(pattern) + + # Add a pattern to get the UID + patterns.append("?entry <%s> ?uid." % OLPC_TERMS['uid']) + + # Save the query + self._graph_patterns = patterns + logging.debug('[IDX] patterns %r', [str(q) for q in patterns]) + + def set_sort_by_value(self, name, incremental): + # TODO Convert order by into SPARQL ORDER BY clause (map word to URI and adjust +-) + pass + + def get_set(self, offset, limit): + sparql = SPARQL() + query = "SELECT ?uid WHERE {\n %s \n}" % "\n".join(self._graph_patterns) + results = [] + for result in sparql.execute_select(query): + results.append(result['uid']) + return results + +class IndexStore(object): + ''' + Index metadata and provide rich query facilities on it. + ''' + + def __init__(self): + self._url = '127.0.0.1:8080' + self._database = None + self._flush_timeout = None + self._pending_writes = 0 + self._index_updated_path = os.path.join( + layoutmanager.get_instance().get_root_path(), 'index_updated') + + def open_index(self): + pass + + def close_index(self): + """Close index database if it is open.""" + if not self._database: + return + + self._flush(True) + self._database = None + + def remove_index(self): + index_path = layoutmanager.get_instance().get_index_path() + if not os.path.exists(index_path): + return + for f in os.listdir(index_path): + os.remove(os.path.join(index_path, f)) + + def contains(self, uid): + enquire = Enquire() + enquire.parse_query({'uid' : uid}, None) + query_result = enquire.get_set(0, 1) + return len(query_result) == 1 + + def store(self, uid, properties): + logging.debug('[IDX] store ' + uid) + # TODO needed ? + + def find(self, query): + logging.debug('[IDX] find ' + str(query)) + offset = query.pop('offset', 0) + limit = query.pop('limit', MAX_QUERY_LIMIT) + order_by = query.pop('order_by', []) + query_string = query.pop('query', None) + + enquire = Enquire() + enquire.parse_query(query, query_string) + + if not order_by: + order_by = '+timestamp' + else: + order_by = order_by[0] + + if order_by == '+timestamp': + enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + elif order_by == '-timestamp': + enquire.set_sort_by_value(_VALUE_TIMESTAMP, False) + elif order_by == '+title': + enquire.set_sort_by_value(_VALUE_TITLE, True) + elif order_by == '-title': + enquire.set_sort_by_value(_VALUE_TITLE, False) + elif order_by == '+filesize': + enquire.set_sort_by_value(_VALUE_FILESIZE, True) + elif order_by == '-filesize': + enquire.set_sort_by_value(_VALUE_FILESIZE, False) + elif order_by == '+creation_time': + enquire.set_sort_by_value(_VALUE_CREATION_TIME, True) + elif order_by == '-creation_time': + enquire.set_sort_by_value(_VALUE_CREATION_TIME, False) + else: + logging.warning('Unsupported property for sorting: %s', order_by) + + query_result = enquire.get_set(offset, limit) + total_count = len(query_result) + logging.debug('[IDX] query result %r' % query_result) + + uids = [] + for hit in query_result: + uids.append(hit) #.split(",")[1][1:-1] + + return (uids, total_count) + + def delete(self, uid): + # TODO Needed ? + #self._database.delete_document(_PREFIX_FULL_VALUE + _PREFIX_UID + uid) + #self._flush() + pass + + def get_activities(self): + logging.debug('[IDX] get_activities') + activities = [] + #prefix = _PREFIX_FULL_VALUE + _PREFIX_ACTIVITY + #for term in self._database.allterms(prefix): + # activities.append(term.term[len(prefix):]) + return activities + + def flush(self): + self._flush(True) + + def get_index_updated(self): + return os.path.exists(self._index_updated_path) + + index_updated = property(get_index_updated) + + def _set_index_updated(self, index_updated): + if index_updated != self.index_updated: + if index_updated: + index_updated_file = open(self._index_updated_path, 'w') + # index_updated = True will happen every + # indexstore._FLUSH_TIMEOUT seconds, so it is ok to fsync + os.fsync(index_updated_file.fileno()) + index_updated_file.close() + else: + os.remove(self._index_updated_path) + + def _flush_timeout_cb(self): + self._flush(True) + return False + + def _flush(self, force=False): + """Called after any database mutation""" + logging.debug('IndexStore.flush: force=%r _pending_writes=%r', + force, self._pending_writes) + + self._set_index_updated(False) + + if self._flush_timeout is not None: + gobject.source_remove(self._flush_timeout) + self._flush_timeout = None + + self._pending_writes += 1 + if force or self._pending_writes > _FLUSH_THRESHOLD: + self._pending_writes = 0 + self._set_index_updated(True) + else: + self._flush_timeout = gobject.timeout_add_seconds(_FLUSH_TIMEOUT, + self._flush_timeout_cb) |