From b855a33f4a64e0c76994575bba5a89f14269d5e2 Mon Sep 17 00:00:00 2001 From: Christophe Gueret Date: Thu, 25 Aug 2011 12:37:15 +0000 Subject: Making some progress :) --- diff --git a/bin/datastore-service.py b/bin/datastore-service.py index e8d9961..6453d3e 100755 --- a/bin/datastore-service.py +++ b/bin/datastore-service.py @@ -1,5 +1,7 @@ #!/usr/bin/env python2 import sys +sys.path.append('/home/cgueret/Code/sugar-datastore/src') + import os import signal import logging @@ -11,6 +13,7 @@ from carquinyol.datastore import DataStore from sugar import logger + # Path handling profile = os.environ.get('SUGAR_PROFILE', 'default') base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) @@ -19,8 +22,9 @@ if not os.path.exists(log_dir): os.makedirs(log_dir) # setup logger -# logger.start('datastore') - +logger.start('datastore') +#logging.basicConfig(level=5, format='%(created)f %(levelname)s %(name)s: %(message)s', stream=sys.stderr) +#logging.getLogger('').setLevel(5) # build the datastore dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) diff --git a/bin/test.py b/bin/test.py index 4ccd715..a45b8c7 100644 --- a/bin/test.py +++ b/bin/test.py @@ -12,18 +12,18 @@ if os.path.exists("/tmp/olpc-session-bus"): os.environ["DBUS_SESSION_BUS_ADDRESS"] = "unix:path=/tmp/olpc-session-bus" if __name__ == "__main__": - try: - entry = datastore.create() - entry.metadata['title'] = 'Terminal-test' - print entry.metadata.get_dictionary().copy() - datastore.write(entry) - - query = {} - query['query'] = '*Terminal*' - objects, count = datastore.find(query, limit=2) - print objects, count - except dbus.DBusException: - print 'ERROR: Unable to connect to the datastore.\n' - except Exception, e: - print 'ERROR: %s' % (e) + entry = datastore.create() + entry.metadata['title'] = 'Terminal-test' + entry.metadata['activity'] = 'Terminal' + entry.metadata['activity_id'] = 'Terminal' + #datastore.write(entry) + #print "Saved %s" % entry.get_object_id() + entry.destroy() + + query = {} + #query['query'] = 'Terminal*' + query['activity_id'] = 'Terminal' + print "Search %r" % query + objects, count = datastore.find(query, limit=2) + print objects, count diff --git a/src/carquinyol/layoutmanager.py b/removed/layoutmanager.py index 3179a98..3179a98 100644 --- a/src/carquinyol/layoutmanager.py +++ b/removed/layoutmanager.py diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index a8ec48f..82c5748 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -28,12 +28,12 @@ import gobject from sugar import mime -from carquinyol import layoutmanager from carquinyol import migration -from carquinyol.layoutmanager import MAX_QUERY_LIMIT from carquinyol.filestore import FileStore from carquinyol.optimizer import Optimizer +from semanticxo import layoutmanager +from semanticxo.layoutmanager import MAX_QUERY_LIMIT from semanticxo.metadatastore import MetadataStore from semanticxo.indexstore import IndexStore diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index 5f518ab..b2c0980 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -21,7 +21,7 @@ import tempfile import gobject -from carquinyol import layoutmanager +from semanticxo import layoutmanager class FileStore(object): diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py index 1745f2c..18f02c7 100644 --- a/src/carquinyol/migration.py +++ b/src/carquinyol/migration.py @@ -24,7 +24,7 @@ import time import cjson -from carquinyol import layoutmanager +from semanticxo import layoutmanager DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index c038c2b..44d4836 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -21,7 +21,7 @@ import subprocess import gobject -from carquinyol import layoutmanager +from semanticxo import layoutmanager class Optimizer(object): diff --git a/src/semanticxo/indexstore.py b/src/semanticxo/indexstore.py index 50298e0..8947ed5 100644 --- a/src/semanticxo/indexstore.py +++ b/src/semanticxo/indexstore.py @@ -16,14 +16,13 @@ import logging import os -import sys - import gobject -import xapian #@UnresolvedImport -from xapian import WritableDatabase, Document, Enquire, Query #@UnresolvedImport +import httplib +import urllib -from carquinyol import layoutmanager -from carquinyol.layoutmanager import MAX_QUERY_LIMIT +from semanticxo import layoutmanager +from semanticxo.layoutmanager import MAX_QUERY_LIMIT +from semanticxo.metadatastore import OLPC_TERMS _VALUE_UID = 0 _VALUE_TIMESTAMP = 1 @@ -50,183 +49,59 @@ _PROPERTIES_NOT_TO_INDEX = ['timestamp', 'preview'] _MAX_RESULTS = int(2 ** 31 - 1) -_QUERY_TERM_MAP = { - 'uid': _PREFIX_UID, - 'activity': _PREFIX_ACTIVITY, - 'activity_id': _PREFIX_ACTIVITY_ID, - 'mime_type': _PREFIX_MIME_TYPE, - 'keep': _PREFIX_KEEP, -} - -_QUERY_VALUE_MAP = { - 'timestamp': {'number': _VALUE_TIMESTAMP, 'type': float}, - 'filesize': {'number': _VALUE_FILESIZE, 'type': int}, - 'creation_time': {'number': _VALUE_CREATION_TIME, 'type': float}, -} - - -class TermGenerator (xapian.TermGenerator): - - def index_document(self, document, properties): - document.add_value(_VALUE_TIMESTAMP, - xapian.sortable_serialise(float(properties['timestamp']))) - document.add_value(_VALUE_TITLE, properties.get('title', '').strip()) - if 'filesize' in properties: - try: - document.add_value(_VALUE_FILESIZE, - xapian.sortable_serialise(int(properties['filesize']))) - except (ValueError, TypeError): - logging.debug('Invalid value for filesize property: %s', - properties['filesize']) - if 'creation_time' in properties: - try: - document.add_value( - _VALUE_CREATION_TIME, xapian.sortable_serialise( - float(properties['creation_time']))) - except (ValueError, TypeError): - logging.debug('Invalid value for creation_time property: %s', - properties['creation_time']) - - self.set_document(document) - - properties = dict(properties) - self._index_known(document, properties) - self._index_unknown(document, properties) - - def _index_known(self, document, properties): - for name, prefix in _QUERY_TERM_MAP.items(): - if (name not in properties): - continue - - self._index_property(document, name, properties.pop(name), prefix) - - def _index_unknown(self, document, properties): - for name, value in properties.items(): - self._index_property(document, name, value) - - def _index_property(self, doc, name, value, prefix=''): - if name in _PROPERTIES_NOT_TO_INDEX or not value: - return - - if isinstance(value, unicode): - value = value.encode('utf-8') - elif not isinstance(value, basestring): - value = str(value) - - # We need to add the full value (i.e. not split into words) so - # we can enumerate unique values. It also simplifies setting up - # dictionary-based queries. - if prefix: - doc.add_term(_PREFIX_FULL_VALUE + prefix + value) - - self.index_text(value, 1, prefix or _PREFIX_NONE) - self.increase_termpos() - +_QUERY_TERMS = ['uid', 'activity', 'activity_id', 'mime_type', 'timestamp', 'filesize', 'creation_time'] -class QueryParser (xapian.QueryParser): - """QueryParser that understands dictionaries and Xapian query strings. - - The dictionary contains metadata names as keys and either basic types - (exact match), 2-tuples (range, only valid for value-stored metadata) - or a list (multiple exact matches joined with OR) as values. - An empty dictionary matches everything. Queries from different keys - (i.e. different metadata names) are joined with AND. - """ +class Enquire(object): def __init__(self): - xapian.QueryParser.__init__(self) - - for name, prefix in _QUERY_TERM_MAP.items(): - self.add_prefix(name, prefix) - self.add_prefix('', prefix) - - self.add_prefix('', _PREFIX_NONE) - - def _parse_query_term(self, name, prefix, value): - if isinstance(value, list): - subqueries = [self._parse_query_term(name, prefix, word) - for word in value] - return Query(Query.OP_OR, subqueries) - - elif prefix: - return Query(_PREFIX_FULL_VALUE + prefix + str(value)) - else: - return Query(_PREFIX_NONE + str(value)) - - def _parse_query_value_range(self, name, info, value): - if len(value) != 2: - raise TypeError( - 'Only tuples of size 2 have a defined meaning. ' - 'Did you mean to pass a list instead?') - - start, end = value - return Query(Query.OP_VALUE_RANGE, info['number'], - self._convert_value(info, start), self._convert_value(info, end)) - - def _convert_value(self, info, value): - if info['type'] in (float, int, long): - return xapian.sortable_serialise(info['type'](value)) - - return str(info['type'](value)) - - def _parse_query_value(self, name, info, value): - if isinstance(value, list): - subqueries = [self._parse_query_value(name, info, word) - for word in value] - return Query(Query.OP_OR, subqueries) - - elif isinstance(value, tuple): - return self._parse_query_value_range(name, info, value) - - elif isinstance(value, dict): - # compatibility option for timestamp: {'start': 0, 'end': 1} - start = value.get('start', 0) - end = value.get('end', sys.maxint) - return self._parse_query_value_range(name, info, (start, end)) - - else: - return self._parse_query_value_range(name, info, (value, value)) - - def _parse_query_xapian(self, query_str): - try: - return xapian.QueryParser.parse_query( - self, query_str, - QueryParser.FLAG_PHRASE | - QueryParser.FLAG_BOOLEAN | - QueryParser.FLAG_LOVEHATE | - QueryParser.FLAG_WILDCARD, - '') - - except xapian.QueryParserError, exception: - logging.warning('Invalid query string: ' + exception.get_msg()) - return Query() - - # pylint: disable=W0221 + self._graph_patterns = [] + pass + def parse_query(self, query_dict, query_string): - logging.debug('parse_query %r %r', query_dict, query_string) - queries = [] + logging.debug('[IDX] parse_query %r %r', query_dict, query_string) + patterns = [] query_dict = dict(query_dict) + # Full text search if query_string is not None: - queries.append(self._parse_query_xapian(str(query_string))) + # TODO Do something with a regexp in title + pass + # Properties based search for name, value in query_dict.items(): - if name in _QUERY_TERM_MAP: - queries.append(self._parse_query_term(name, - _QUERY_TERM_MAP[name], value)) - elif name in _QUERY_VALUE_MAP: - queries.append(self._parse_query_value(name, - _QUERY_VALUE_MAP[name], value)) + if name in _QUERY_TERMS: + pattern = "?entry <%s> \"%s\"." % (OLPC_TERMS[name], value) + patterns.append(pattern) else: logging.warning('Unknown term: %r=%r', name, value) - if not queries: - queries.append(Query('')) - - logging.debug('queries: %r', [str(q) for q in queries]) - return Query(Query.OP_AND, queries) - + # Get all the entries + if not patterns: + pattern = "?entry <%s>." % OLPC_TERMS['DSObject'] + patterns.append(pattern) + # Add a pattern to get the UID + patterns.append("?entry <%s> ?uid." % OLPC_TERMS['uid']) + + # Save the query + self._graph_patterns = patterns + logging.debug('[IDX] patterns %r', [str(q) for q in patterns]) + + def set_sort_by_value(self, name, incremental): + # TODO Convert order by into SPARQL ORDER BY clause (map word to URI and adjust +-) + pass + + def get_set(self, offset, limit): + query = "SELECT ?uid WHERE {\n %s \n}" % "\n".join(self._graph_patterns) + params = {'query': query, 'format' : 'csv'} + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + conn = httplib.HTTPConnection('127.0.0.1:8080') + conn.request("POST", "/sparql", urllib.urlencode(params), headers=headers) + response = conn.getresponse() + results = response.read().split('\n')[1:-1] + conn.close() + return results + class IndexStore(object): ''' Index metadata and provide rich query facilities on it. @@ -239,11 +114,10 @@ class IndexStore(object): self._pending_writes = 0 self._index_updated_path = os.path.join( layoutmanager.get_instance().get_root_path(), 'index_updated') - + def open_index(self): - index_path = layoutmanager.get_instance().get_index_path() - self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN) - + pass + def close_index(self): """Close index database if it is open.""" if not self._database: @@ -260,42 +134,30 @@ class IndexStore(object): os.remove(os.path.join(index_path, f)) def contains(self, uid): - postings = self._database.postlist(_PREFIX_FULL_VALUE + \ - _PREFIX_UID + uid) - try: - __ = postings.next() - except StopIteration: - return False - return True + enquire = Enquire() + enquire.parse_query({'uid' : uid}, None) + query_result = enquire.get_set(0, 1) + return len(query_result) == 1 def store(self, uid, properties): - #print '[IDX] store ' + uid + ' ' + str(properties) - pass - + logging.debug('[IDX] store ' + uid) + # TODO needed ? + def find(self, query): - print '[IDX] find ' + str(query) + logging.debug('[IDX] find ' + str(query)) offset = query.pop('offset', 0) limit = query.pop('limit', MAX_QUERY_LIMIT) order_by = query.pop('order_by', []) query_string = query.pop('query', None) - query_parser = QueryParser() - query_parser.set_database(self._database) - - enquire = Enquire(self._database) - enquire.set_query(query_parser.parse_query(query, query_string)) - - # TODO Convert query string into regexp based on any property - - # This will assure that the results count is exact. - check_at_least = offset + limit + 1 + enquire = Enquire() + enquire.parse_query(query, query_string) if not order_by: order_by = '+timestamp' else: order_by = order_by[0] - # TODO Convert order by into SPARQL ORDER BY clause (map word to URI and adjust +-) if order_by == '+timestamp': enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) elif order_by == '-timestamp': @@ -315,20 +177,22 @@ class IndexStore(object): else: logging.warning('Unsupported property for sorting: %s', order_by) - query_result = enquire.get_mset(offset, limit, check_at_least) - total_count = query_result.get_matches_estimated() + query_result = enquire.get_set(offset, limit) + total_count = len(query_result) uids = [] for hit in query_result: - uids.append(hit.document.get_value(_VALUE_UID)) + uids.append(hit.split(",")[1][1:-1]) return (uids, total_count) def delete(self, uid): + # TODO Needed ? self._database.delete_document(_PREFIX_FULL_VALUE + _PREFIX_UID + uid) self._flush() def get_activities(self): + logging.debug('[IDX] get_activities') activities = [] prefix = _PREFIX_FULL_VALUE + _PREFIX_ACTIVITY for term in self._database.allterms(prefix): @@ -362,7 +226,7 @@ class IndexStore(object): """Called after any database mutation""" logging.debug('IndexStore.flush: force=%r _pending_writes=%r', force, self._pending_writes) - + self._set_index_updated(False) if self._flush_timeout is not None: @@ -371,7 +235,6 @@ class IndexStore(object): self._pending_writes += 1 if force or self._pending_writes > _FLUSH_THRESHOLD: - self._database.flush() self._pending_writes = 0 self._set_index_updated(True) else: diff --git a/src/carquinyol/layoutmanager.py b/src/semanticxo/layoutmanager.py index 3179a98..ae13f7e 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/semanticxo/layoutmanager.py @@ -16,6 +16,8 @@ import os import logging +import httplib +import urllib MAX_QUERY_LIMIT = 40960 CURRENT_LAYOUT_VERSION = 6 @@ -81,12 +83,23 @@ class LayoutManager(object): return os.path.join(self.get_checksums_dir(), 'queue') def find_all(self): + logging.debug('[LMA] find all') + print "find all" uids = [] - for f in os.listdir(self._root_path): - if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: - for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - uids.append(g) + query = """SELECT ?uid WHERE { + ?s . + ?s ?uid. + }""" + params = {'query': query, 'format' : 'csv'} + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + conn = httplib.HTTPConnection('127.0.0.1:8080') + conn.request("POST", "/sparql", urllib.urlencode(params), headers=headers) + response = conn.getresponse() + results = response.read().split('\n')[1:-1] + logging.debug(results) + conn.close() + print "a" + return uids def is_empty(self): diff --git a/src/semanticxo/metadatastore.py b/src/semanticxo/metadatastore.py index f6f9081..66e5893 100644 --- a/src/semanticxo/metadatastore.py +++ b/src/semanticxo/metadatastore.py @@ -3,6 +3,7 @@ Created on Apr 11, 2011 @author: cgueret ''' +import logging import httplib import urllib import time @@ -11,6 +12,8 @@ from rdflib import ConjunctiveGraph, RDF, URIRef, Namespace, Literal OLPC = Namespace("http://example.org/") OLPC_TERMS = Namespace("http://example.org/terms#") +_QUERY_INT_KEY = ['timestamp', 'filesize', 'creation_time'] + class MetadataStore(object): ''' Store metadata into the triple store using HTTP calls. @@ -59,16 +62,22 @@ class MetadataStore(object): def retrieve(self, uid, properties=None): - print '[MDS] retrieve ' + uid + logging.debug('[MDS] retrieve %r' % uid) props = {} query = 'SELECT * WHERE { <%s> ?p ?o}' % self._get_resource(uid) for line in self._sparql_get(query): - print line - + (id, prop, value) = line.split(',') + if prop[4:-1].startswith(OLPC_TERMS): + key=prop[4:-1].split(OLPC_TERMS)[1] + if key in _QUERY_INT_KEY: + props[key] = int(value[1:-1]) + else: + props[key] = value[1:-1] + # HACK: This is expected to be always present if 'creation_time' not in props: props['creation_time'] = int(time.time()) - + return props def delete(self, uid): -- cgit v0.9.1