diff options
author | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-10-08 16:37:38 (GMT) |
---|---|---|
committer | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-10-08 16:37:38 (GMT) |
commit | 9bd1bff4d8ab6247e5793c0b78b9bf2581773341 (patch) | |
tree | 13a4914f0de789d3c6dd9acf70a273f6d34c9230 | |
parent | e29723a33192cb27b1de83fd8e1abd110bbc1433 (diff) | |
parent | b1c4a254adc0573221c0d6661459248fd3c507ef (diff) |
Merge branch 'master' of ../datastore2
Conflicts:
src/olpc/datastore/backingstore.py
src/olpc/datastore/datastore.py
src/olpc/datastore/xapianindex.py
tests/test_sugar.py
36 files changed, 1469 insertions, 5081 deletions
diff --git a/Makefile.am b/Makefile.am index 37d04c1..5fa2790 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,10 @@ -SUBDIRS = bin etc secore src +ACLOCAL_AMFLAGS = -I m4 + +SUBDIRS = bin etc src test: @cd tests $(MAKE) -C tests test EXTRA_DIST = README.txt LICENSE.GPL + @@ -1,45 +1 @@ -Datastore ---------- -A simple log like datastore able to connect with multiple -backends. The datastore supports connectionig and disconnecting from -backends on the fly to help the support the limit space/memory -characteristics of the OLPC system and the fact that network services -may become unavailable at times - -API ---- - -For developer information see the doc tests in: - - src/olpc/datastore/tests/query.txt - - -Dependencies ------------- - -xapian -- integrated full text indexing - svn co svn://svn.xapian.org/xapian/trunk xapian - currently this requires a checkout - -secore -- pythonic xapian binding -- include in disto but from - http://flaxcode.googlecode.com/svn/trunk/libs/secore - -dbus -- command and control - -ore.main -- (optional) A command line application framework/shell - used in bin/datasore. If you don't want to use this dep - for now run bin/datasore-native - - - -Converters ----------- -(used to index binaries) - -pdftotext from poppler-utils -abiword/write - - -Benjamin Saller -Copyright ObjectRealms, LLC. 2007 @@ -1,3 +1,5 @@ #!/bin/sh +export ACLOCAL="aclocal -I m4" + autoreconf -i ./configure "$@" diff --git a/bin/datastore-service b/bin/datastore-service index aba7112..850af9d 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -4,46 +4,26 @@ import gobject import dbus.service import dbus.mainloop.glib import dbus.glib -from olpc.datastore import DataStore, backingstore +from olpc.datastore.datastore import DataStore from sugar import logger # Path handling profile = os.environ.get('SUGAR_PROFILE', 'default') base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) -repo_dir = os.path.join(base_dir, 'datastore') - -# operate from the repo directory -if not os.path.exists(repo_dir): os.makedirs(repo_dir) - log_dir = os.path.join(base_dir, "logs") if not os.path.exists(log_dir): os.makedirs(log_dir) -#os.chdir(repo_dir) - # setup logger logger.start('datastore') -# check for old lockfiles, the rules here are that we can't be -# connected to a tty. If we are not then in all likelyhood the process -# was started automatically, which hopefully implies a single instance -if not sys.stdin.isatty(): - lf = os.path.join(repo_dir, 'fulltext', 'flintlock') - if os.path.exists(lf): - logging.warning("Old lock file found -- removing.") - os.unlink(lf) - - # build the datastore dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() connected = True ds = DataStore() -ds.registerBackend(backingstore.FileBackingStore) -ds.registerBackend(backingstore.InplaceFileBackingStore) # and run it -logging.info("Starting Datastore %s" % (repo_dir)) mainloop = gobject.MainLoop() def handle_disconnect(): @@ -65,11 +45,6 @@ signal.signal(signal.SIGHUP, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) def main(): - if '-m' in sys.argv: - # mount automatically for local testing - ds.mount(repo_dir) - ds.complete_indexing() - try: mainloop.run() except KeyboardInterrupt: @@ -81,7 +56,3 @@ main() ds.stop() -#import hotshot -#p = hotshot.Profile('hs.prof') -#p.run('main()') - diff --git a/configure.ac b/configure.ac index dbcc25d..1531c69 100644 --- a/configure.ac +++ b/configure.ac @@ -2,17 +2,21 @@ AC_INIT([sugar-datastore],[0.82.0],[],[sugar-datastore]) AC_PREREQ([2.59]) +AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_SRCDIR([configure.ac]) AM_INIT_AUTOMAKE([1.9 foreign dist-bzip2 no-dist-gzip]) +AC_DISABLE_STATIC +AC_PROG_LIBTOOL + AM_PATH_PYTHON +AM_CHECK_PYTHON_HEADERS(,[AC_MSG_ERROR(could not find Python headers)]) AC_OUTPUT([ Makefile bin/Makefile etc/Makefile -secore/Makefile src/Makefile src/olpc/Makefile src/olpc/datastore/Makefile diff --git a/m4/python.m4 b/m4/python.m4 new file mode 100644 index 0000000..e1c5266 --- /dev/null +++ b/m4/python.m4 @@ -0,0 +1,62 @@ +## this one is commonly used with AM_PATH_PYTHONDIR ... +dnl AM_CHECK_PYMOD(MODNAME [,SYMBOL [,ACTION-IF-FOUND [,ACTION-IF-NOT-FOUND]]]) +dnl Check if a module containing a given symbol is visible to python. +AC_DEFUN([AM_CHECK_PYMOD], +[AC_REQUIRE([AM_PATH_PYTHON]) +py_mod_var=`echo $1['_']$2 | sed 'y%./+-%__p_%'` +AC_MSG_CHECKING(for ifelse([$2],[],,[$2 in ])python module $1) +AC_CACHE_VAL(py_cv_mod_$py_mod_var, [ +ifelse([$2],[], [prog=" +import sys +try: + import $1 +except ImportError: + sys.exit(1) +except: + sys.exit(0) +sys.exit(0)"], [prog=" +import $1 +$1.$2"]) +if $PYTHON -c "$prog" 1>&AC_FD_CC 2>&AC_FD_CC + then + eval "py_cv_mod_$py_mod_var=yes" + else + eval "py_cv_mod_$py_mod_var=no" + fi +]) +py_val=`eval "echo \`echo '$py_cv_mod_'$py_mod_var\`"` +if test "x$py_val" != xno; then + AC_MSG_RESULT(yes) + ifelse([$3], [],, [$3 +])dnl +else + AC_MSG_RESULT(no) + ifelse([$4], [],, [$4 +])dnl +fi +]) + +dnl a macro to check for ability to create python extensions +dnl AM_CHECK_PYTHON_HEADERS([ACTION-IF-POSSIBLE], [ACTION-IF-NOT-POSSIBLE]) +dnl function also defines PYTHON_INCLUDES +AC_DEFUN([AM_CHECK_PYTHON_HEADERS], +[AC_REQUIRE([AM_PATH_PYTHON]) +AC_MSG_CHECKING(for headers required to compile python extensions) +dnl deduce PYTHON_INCLUDES +py_prefix=`$PYTHON -c "import sys; print sys.prefix"` +py_exec_prefix=`$PYTHON -c "import sys; print sys.exec_prefix"` +PYTHON_INCLUDES="-I${py_prefix}/include/python${PYTHON_VERSION}" +if test "$py_prefix" != "$py_exec_prefix"; then + PYTHON_INCLUDES="$PYTHON_INCLUDES -I${py_exec_prefix}/include/python${PYTHON_VERSION}" +fi +AC_SUBST(PYTHON_INCLUDES) +dnl check if the headers exist: +save_CPPFLAGS="$CPPFLAGS" +CPPFLAGS="$CPPFLAGS $PYTHON_INCLUDES" +AC_TRY_CPP([#include <Python.h>],dnl +[AC_MSG_RESULT(found) +$1],dnl +[AC_MSG_RESULT(not found) +$2]) +CPPFLAGS="$save_CPPFLAGS" +]) diff --git a/secore/Makefile.am b/secore/Makefile.am deleted file mode 100644 index 393ba8f..0000000 --- a/secore/Makefile.am +++ /dev/null @@ -1,12 +0,0 @@ -datastoredir = $(pythondir)/secore -datastore_PYTHON = \ - __init__.py \ - datastructures.py \ - fieldmappings.py \ - searchconnection.py \ - errors.py \ - highlight.py \ - marshall.py \ - fieldactions.py \ - indexerconnection.py \ - parsedate.py diff --git a/secore/__init__.py b/secore/__init__.py deleted file mode 100644 index 157fea4..0000000 --- a/secore/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -"""Search engine Core. - -See the accompanying documentation for details. In particular, there should be -an accompanying file "introduction.html" (or "introduction.rst") which gives -details of how to use the secore package. - -""" -__docformat__ = "restructuredtext en" - -from datastructures import * -from errors import * -from indexerconnection import * -from searchconnection import * diff --git a/secore/datastructures.py b/secore/datastructures.py deleted file mode 100644 index 588ac06..0000000 --- a/secore/datastructures.py +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""datastructures.py: Datastructures for search engine core. - -""" -__docformat__ = "restructuredtext en" - -import xapian as _xapian -import cPickle as _cPickle - -class Field(object): - # Use __slots__ because we're going to have very many Field objects in - # typical usage. - __slots__ = 'name', 'value' - - def __init__(self, name, value): - self.name = name - self.value = value - - def __repr__(self): - return 'Field(%r, %r)' % (self.name, self.value) - -class UnprocessedDocument(object): - """A unprocessed document to be passed to the indexer. - - This represents an item to be processed and stored in the search engine. - Each document will be processed by the indexer to generate a - ProcessedDocument, which can then be stored in the search engine index. - - Note that some information in an UnprocessedDocument will not be - represented in the ProcessedDocument: therefore, it is not possible to - retrieve an UnprocessedDocument from the search engine index. - - An unprocessed document is a simple container with two attributes: - - - `fields` is a list of Field objects. - - `id` is a string holding a unique identifier for the document (or - None to get the database to allocate a unique identifier automatically - when the document is added). - - """ - - __slots__ = 'id', 'fields', - def __init__(self, id=None, fields=None): - self.id = id - if fields is None: - self.fields = [] - else: - self.fields = fields - - def __repr__(self): - return 'UnprocessedDocument(%r, %r)' % (self.id, self.fields) - -class ProcessedDocument(object): - """A processed document, as stored in the index. - - This represents an item which is ready to be stored in the search engine, - or which has been returned by the search engine. - - """ - - __slots__ = '_doc', '_fieldmappings', '_data', '_id' - def __init__(self, fieldmappings, xapdoc=None): - """Create a ProcessedDocument. - - `fieldmappings` is the configuration from a database connection used lookup - the configuration to use to store each field. - - If supplied, `xapdoc` is a Xapian document to store in the processed - document. Otherwise, a new Xapian document is created. - - """ - if xapdoc is None: - self._doc = _xapian.Document() - else: - self._doc = xapdoc - self._fieldmappings = fieldmappings - self._data = None - self._id = None - - def add_term(self, field, term, wdfinc=1, positions=None): - """Add a term to the document. - - Terms are the main unit of information used for performing searches. - - - `field` is the field to add the term to. - - `term` is the term to add. - - `wdfinc` is the value to increase the within-document-frequency - measure for the term by. - - `positions` is the positional information to add for the term. - This may be None to indicate that there is no positional information, - or may be an integer to specify one position, or may be a sequence of - integers to specify several positions. (Note that the wdf is not - increased automatically for each position: if you add a term at 7 - positions, and the wdfinc value is 2, the total wdf for the term will - only be increased by 2, not by 14.) - - """ - prefix = self._fieldmappings.get_prefix(field) - if len(term) > 0: - # We use the following check, rather than "isupper()" to ensure - # that we match the check performed by the queryparser, regardless - # of our locale. - if ord(term[0]) >= ord('A') and ord(term[0]) <= ord('Z'): - prefix = prefix + ':' - if positions is None: - self._doc.add_term(prefix + term, wdfinc) - elif isinstance(positions, int): - self._doc.add_posting(prefix + term, positions, wdfinc) - else: - self._doc.add_term(prefix + term, wdfinc) - for pos in positions: - self._doc.add_posting(prefix + term, pos, 0) - - def add_value(self, field, value): - """Add a value to the document. - - Values are additional units of information used when performing - searches. Note that values are _not_ intended to be used to store - information for display in the search results - use the document data - for that. The intention is that as little information as possible is - stored in values, so that they can be accessed as quickly as possible - during the search operation. - - Unlike terms, each document may have at most one value in each field - (whereas there may be an arbitrary number of terms in a given field). - If an attempt to add multiple values to a single field is made, only - the last value added will be stored. - - """ - slot = self._fieldmappings.get_slot(field) - self._doc.add_value(slot, value) - - def get_value(self, field): - """Get a value from the document. - - """ - slot = self._fieldmappings.get_slot(field) - return self._doc.get_value(slot) - - def prepare(self): - """Prepare the document for adding to a xapian database. - - This updates the internal xapian document with any changes which have - been made, and then returns it. - - """ - if self._data is not None: - self._doc.set_data(_cPickle.dumps(self._data, 2)) - self._data = None - return self._doc - - def _get_data(self): - if self._data is None: - rawdata = self._doc.get_data() - if rawdata == '': - self._data = {} - else: - self._data = _cPickle.loads(rawdata) - return self._data - def _set_data(self, data): - if not isinstance(data, dict): - raise TypeError("Cannot set data to any type other than a dict") - self._data = data - data = property(_get_data, _set_data, doc= - """The data stored in this processed document. - - This data is a dictionary of entries, where the key is a fieldname, and the - value is a list of strings. - - """) - - def _get_id(self): - if self._id is None: - tl = self._doc.termlist() - try: - term = tl.skip_to('Q').term - if len(term) == 0 or term[0] != 'Q': - return None - self._id = term[1:] - except StopIteration: - self._id = None - return None - - return self._id - def _set_id(self, id): - tl = self._doc.termlist() - try: - term = tl.skip_to('Q').term - except StopIteration: - term = '' - if len(term) != 0 and term[0] == 'Q': - self._doc.remove_term(term) - if id is not None: - self._doc.add_term('Q' + id, 0) - id = property(_get_id, _set_id, doc= - """The unique ID for this document. - - """) - - def __repr__(self): - return '<ProcessedDocument(%r)>' % (self.id) - -if __name__ == '__main__': - import doctest, sys - doctest.testmod (sys.modules[__name__]) diff --git a/secore/errors.py b/secore/errors.py deleted file mode 100644 index b6ad00f..0000000 --- a/secore/errors.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""errors.py: Exceptions for the search engine core. - -""" -__docformat__ = "restructuredtext en" - -class SearchEngineError(Exception): - r"""Base class for exceptions thrown by the search engine. - - Any errors generated by the python level interface to xapian will be - instances of this class or its subclasses. - - """ - -class IndexerError(SearchEngineError): - r"""Class used to report errors from the indexing API. - - """ - -class SearchError(SearchEngineError): - r"""Class used to report errors from the search API. - - """ - diff --git a/secore/fieldactions.py b/secore/fieldactions.py deleted file mode 100644 index c595f0b..0000000 --- a/secore/fieldactions.py +++ /dev/null @@ -1,358 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""fieldactions.py: Definitions and implementations of field actions. - -""" -__docformat__ = "restructuredtext en" - -import errors as _errors -import marshall as _marshall -import xapian as _xapian -import parsedate as _parsedate - -def _act_store_content(fieldname, doc, value, context): - """Perform the STORE_CONTENT action. - - """ - try: - fielddata = doc.data[fieldname] - except KeyError: - fielddata = [] - doc.data[fieldname] = fielddata - fielddata.append(value) - -def _act_index_exact(fieldname, doc, value, context): - """Perform the INDEX_EXACT action. - - """ - doc.add_term(fieldname, value, 0) - -def _act_index_freetext(fieldname, doc, value, context, weight=1, - language=None, stop=None, spell=False, - nopos=False, noprefix=False): - """Perform the INDEX_FREETEXT action. - - """ - termgen = _xapian.TermGenerator() - if language is not None: - termgen.set_stemmer(_xapian.Stem(language)) - - if stop is not None: - stopper = _xapian.SimpleStopper() - for term in stop: - stopper.add (term) - termgen.set_stopper (stopper) - - if spell: - termgen.set_database(context.index) - termgen.set_flags(termgen.FLAG_SPELLING) - - termgen.set_document(doc._doc) - termgen.set_termpos(context.current_position) - if nopos: - termgen.index_text_without_positions(value, weight, '') - else: - termgen.index_text(value, weight, '') - - if not noprefix: - # Store a second copy of the term with a prefix, for field-specific - # searches. - prefix = doc._fieldmappings.get_prefix(fieldname) - if len(prefix) != 0: - termgen.set_termpos(context.current_position) - if nopos: - termgen.index_text_without_positions(value, weight, prefix) - else: - termgen.index_text(value, weight, prefix) - - # Add a gap between each field instance, so that phrase searches don't - # match across instances. - termgen.increase_termpos(10) - context.current_position = termgen.get_termpos() - -class SortableMarshaller(object): - """Implementation of marshalling for sortable values. - - """ - def __init__(self, indexing=True): - if indexing: - self._err = _errors.IndexerError - else: - self._err = _errors.SearchError - - def marshall_string(self, fieldname, value): - """Marshall a value for sorting in lexicograpical order. - - This returns the input as the output, since strings already sort in - lexicographical order. - - """ - return value - - def marshall_float(self, fieldname, value): - """Marshall a value for sorting as a floating point value. - - """ - # convert the value to a float - try: - value = float(value) - except ValueError: - raise self._err("Value supplied to field %r must be a " - "valid floating point number: was %r" % - (fieldname, value)) - return _marshall.float_to_string(value) - - def marshall_date(self, fieldname, value): - """Marshall a value for sorting as a date. - - """ - try: - value = _parsedate.date_from_string(value) - except ValueError, e: - raise self._err("Value supplied to field %r must be a " - "valid date: was %r: error is '%s'" % - (fieldname, value, str(e))) - return _marshall.date_to_string(value) - - def get_marshall_function(self, fieldname, sorttype): - """Get a function used to marshall values of a given sorttype. - - """ - try: - return { - None: self.marshall_string, - 'string': self.marshall_string, - 'float': self.marshall_float, - 'date': self.marshall_date, - }[sorttype] - except KeyError: - raise self._err("Unknown sort type %r for field %r" % - (sorttype, fieldname)) - - -def _act_sort_and_collapse(fieldname, doc, value, context, type=None): - """Perform the SORTABLE action. - - """ - marshaller = SortableMarshaller() - fn = marshaller.get_marshall_function(fieldname, type) - value = fn(fieldname, value) - doc.add_value(fieldname, value) - -class ActionContext(object): - """The context in which an action is performed. - - This is just used to pass term generators, word positions, and the like - around. - - """ - def __init__(self, index): - self.current_language = None - self.current_position = 0 - self.index = index - -class FieldActions(object): - """An object describing the actions to be performed on a field. - - The supported actions are: - - - `STORE_CONTENT`: store the unprocessed content of the field in the search - engine database. All fields which need to be displayed or used when - displaying the search results need to be given this action. - - - `INDEX_EXACT`: index the exact content of the field as a single search - term. Fields whose contents need to be searchable as an "exact match" - need to be given this action. - - - `INDEX_FREETEXT`: index the content of this field as text. The content - will be split into terms, allowing free text searching of the field. Four - optional parameters may be supplied: - - - 'weight' is a multiplier to apply to the importance of the field. This - must be an integer, and the default value is 1. - - 'language' is the language to use when processing the field. This can - be expressed as an ISO 2-letter language code. The supported languages - are those supported by the xapian core in use. - - 'stop' is an iterable of stopwords to filter out of the generated - terms. Note that due to Xapian design, only non-positional terms are - affected, so this is of limited use. - - 'spell' is a boolean flag - if true, the contents of the field will be - used for spelling correction. - - 'nopos' is a boolean flag - if true, positional information is not - stored. - - 'noprefix' is a boolean flag - if true, prevents terms with the field - prefix being generated. This means that searches specific to this - field will not work, and thus should only be used for special cases. - - - `SORTABLE`: index the content of the field such that it can be used to - sort result sets. It also allows result sets to be restricted to those - documents with a field values in a given range. One optional parameter - may be supplied: - - - 'type' is a value indicating how to sort the field. It has several - possible values: - - - 'string' - sort in lexicographic (ie, alphabetical) order. - This is the default, used if no type is set. - - 'float' - treat the values as (decimal representations of) floating - point numbers, and sort in numerical order . The values in the field - must be valid floating point numbers (according to Python's float() - function). - - 'date' - sort in date order. The values must be valid dates (either - Python datetime.date objects, or ISO 8601 format (ie, YYYYMMDD or - YYYY-MM-DD). - - - `COLLAPSE`: index the content of the field such that it can be used to - "collapse" result sets, such that only the highest result with each value - of the field will be returned. - - """ - - # See the class docstring for the meanings of the following constants. - STORE_CONTENT = 1 - INDEX_EXACT = 2 - INDEX_FREETEXT = 3 - SORTABLE = 4 - COLLAPSE = 5 - - # Sorting and collapsing store the data in a value, but the format depends - # on the sort type. Easiest way to implement is to treat them as the same - # action. - SORT_AND_COLLAPSE = -1 - - # NEED_SLOT is a flag used to indicate that an action needs a slot number - NEED_SLOT = 1 - # NEED_PREFIX is a flag used to indicate that an action needs a prefix - NEED_PREFIX = 2 - - def __init__(self, fieldname): - # Dictionary of actions, keyed by type. - self._actions = {} - self._fieldname = fieldname - - def add(self, field_mappings, action, **kwargs): - """Add an action to perform on a field. - - """ - if action not in (FieldActions.STORE_CONTENT, - FieldActions.INDEX_EXACT, - FieldActions.INDEX_FREETEXT, - FieldActions.SORTABLE, - FieldActions.COLLAPSE,): - raise _errors.IndexerError("Unknown field action: %r" % action) - - info = self._action_info[action] - - # Check parameter names - for key in kwargs.keys(): - if key not in info[1]: - raise _errors.IndexerError("Unknown parameter name for action %r: %r" % (info[0], key)) - - # Fields cannot be indexed both with "EXACT" and "FREETEXT": whilst we - # could implement this, the query parser wouldn't know what to do with - # searches. - if action == FieldActions.INDEX_EXACT: - if FieldActions.INDEX_FREETEXT in self._actions: - raise _errors.IndexerError("Field %r is already marked for indexing " - "as free text: cannot mark for indexing " - "as exact text as well" % self._fieldname) - if action == FieldActions.INDEX_FREETEXT: - if FieldActions.INDEX_EXACT in self._actions: - raise _errors.IndexerError("Field %r is already marked for indexing " - "as exact text: cannot mark for indexing " - "as free text as well" % self._fieldname) - - # Fields cannot be indexed as more than one type for "SORTABLE": to - # implement this, we'd need to use a different prefix for each sortable - # type, but even then the search end wouldn't know what to sort on when - # searching. Also, if they're indexed as "COLLAPSE", the value must be - # stored in the right format for the type "SORTABLE". - if action == FieldActions.SORTABLE or action == FieldActions.COLLAPSE: - if action == FieldActions.COLLAPSE: - sorttype = None - else: - try: - sorttype = kwargs['type'] - except KeyError: - sorttype = 'string' - kwargs['type'] = sorttype - action = FieldActions.SORT_AND_COLLAPSE - - try: - oldsortactions = self._actions[FieldActions.SORT_AND_COLLAPSE] - except KeyError: - oldsortactions = () - - if len(oldsortactions) > 0: - for oldsortaction in oldsortactions: - oldsorttype = oldsortaction['type'] - - if sorttype == oldsorttype or oldsorttype is None: - # Use new type - self._actions[action] = [] - elif sorttype is None: - # Use old type - return - else: - raise _errors.IndexerError("Field %r is already marked for " - "sorting, with a different " - "sort type" % self._fieldname) - - if self.NEED_PREFIX in info[3]: - field_mappings.add_prefix(self._fieldname) - if self.NEED_SLOT in info[3]: - field_mappings.add_slot(self._fieldname) - - # Make an entry for the action - if action not in self._actions: - self._actions[action] = [] - - # Check for repetitions of actions - for old_action in self._actions[action]: - if old_action == kwargs: - return - - # Append the action to the list of actions - self._actions[action].append(kwargs) - - def perform(self, doc, value, context): - """Perform the actions on the field. - - - `doc` is a ProcessedDocument to store the result of the actions in. - - `value` is a string holding the value of the field. - - `context` is an ActionContext object used to keep state in. - - """ - for type, actionlist in self._actions.iteritems(): - info = self._action_info[type] - for kwargs in actionlist: - info[2](self._fieldname, doc, value, context, **kwargs) - - _action_info = { - STORE_CONTENT: ('STORE_CONTENT', (), _act_store_content, (), ), - INDEX_EXACT: ('INDEX_EXACT', (), _act_index_exact, (NEED_PREFIX,), ), - INDEX_FREETEXT: ('INDEX_FREETEXT', ('weight', 'language', 'stop', 'spell', 'nopos', 'noprefix', ), - _act_index_freetext, (NEED_PREFIX, ), ), - SORTABLE: ('SORTABLE', ('type', ), None, (NEED_SLOT,), ), - COLLAPSE: ('COLLAPSE', (), None, (NEED_SLOT,), ), - SORT_AND_COLLAPSE: ('SORT_AND_COLLAPSE', ('type', ), _act_sort_and_collapse, (NEED_SLOT,), ), - } - -if __name__ == '__main__': - import doctest, sys - doctest.testmod (sys.modules[__name__]) diff --git a/secore/fieldmappings.py b/secore/fieldmappings.py deleted file mode 100644 index 3838ce5..0000000 --- a/secore/fieldmappings.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""fieldmappings.py: Mappings from field names to term prefixes, etc. - -""" -__docformat__ = "restructuredtext en" - -import cPickle as _cPickle - -class FieldMappings(object): - """Mappings from field names to term prefixes, slot values, etc. - - The following mappings are maintained: - - - a mapping from field name to the string prefix to insert at the start of - terms. - - a mapping from field name to the slot numbers to store the field contents - in. - - """ - __slots__ = '_prefixes', '_prefixcount', '_slots', '_slotcount', - - def __init__(self, serialised=None): - """Create a new field mapping object, or unserialise a saved one. - - """ - if serialised is not None: - (self._prefixes, self._prefixcount, - self._slots, self._slotcount) = _cPickle.loads(serialised) - else: - self._prefixes = {} - self._prefixcount = 0 - self._slots = {} - self._slotcount = 0 - - def _genPrefix(self): - """Generate a previously unused prefix. - - Prefixes are uppercase letters, and start with 'X' (this is a Xapian - convention, for compatibility with other Xapian tools: other starting - letters are reserved for special meanings): - - >>> maps = FieldMappings() - >>> maps._genPrefix() - 'XA' - >>> maps._genPrefix() - 'XB' - >>> [maps._genPrefix() for i in xrange(60)] - ['XC', 'XD', 'XE', 'XF', 'XG', 'XH', 'XI', 'XJ', 'XK', 'XL', 'XM', 'XN', 'XO', 'XP', 'XQ', 'XR', 'XS', 'XT', 'XU', 'XV', 'XW', 'XX', 'XY', 'XZ', 'XAA', 'XBA', 'XCA', 'XDA', 'XEA', 'XFA', 'XGA', 'XHA', 'XIA', 'XJA', 'XKA', 'XLA', 'XMA', 'XNA', 'XOA', 'XPA', 'XQA', 'XRA', 'XSA', 'XTA', 'XUA', 'XVA', 'XWA', 'XXA', 'XYA', 'XZA', 'XAB', 'XBB', 'XCB', 'XDB', 'XEB', 'XFB', 'XGB', 'XHB', 'XIB', 'XJB'] - >>> maps = FieldMappings() - >>> [maps._genPrefix() for i in xrange(27*26 + 5)][-10:] - ['XVZ', 'XWZ', 'XXZ', 'XYZ', 'XZZ', 'XAAA', 'XBAA', 'XCAA', 'XDAA', 'XEAA'] - """ - res = [] - self._prefixcount += 1 - num = self._prefixcount - while num != 0: - ch = (num - 1) % 26 - res.append(chr(ch + ord('A'))) - num -= ch - num = num // 26 - return 'X' + ''.join(res) - - def get_prefix(self, fieldname): - """Get the prefix used for a given field name. - - """ - return self._prefixes[fieldname] - - def get_slot(self, fieldname): - """Get the slot number used for a given field name. - - """ - return self._slots[fieldname] - - def add_prefix(self, fieldname): - """Allocate a prefix for the given field. - - If a prefix is already allocated for this field, this has no effect. - - """ - if fieldname in self._prefixes: - return - self._prefixes[fieldname] = self._genPrefix() - - def add_slot(self, fieldname): - """Allocate a slot number for the given field. - - If a slot number is already allocated for this field, this has no effect. - - """ - if fieldname in self._slots: - return - self._slots[fieldname] = self._slotcount - self._slotcount += 1 - - def serialise(self): - """Serialise the field mappings to a string. - - This can be unserialised by passing the result of this method to the - constructor of a new FieldMappings object. - - """ - return _cPickle.dumps((self._prefixes, - self._prefixcount, - self._slots, - self._slotcount, - ), 2) diff --git a/secore/highlight.py b/secore/highlight.py deleted file mode 100644 index 38f2050..0000000 --- a/secore/highlight.py +++ /dev/null @@ -1,310 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""highlight.py: Highlight and summarise text. - -""" -__docformat__ = "restructuredtext en" - -import re -import xapian - -class Highlighter(object): - """Class for highlighting text and creating contextual summaries. - - >>> hl = Highlighter("en") - >>> hl.makeSample('Hello world.', ['world']) - 'Hello world.' - >>> hl.highlight('Hello world', ['world'], ('<', '>')) - 'Hello <world>' - - """ - - # split string into words, spaces, punctuation and markup tags - _split_re = re.compile( - '</\\w+>|<\\w+(?:\\s*\\w+="[^"]*"|\\s*\\w+)*\\s*>|[\\w\']+|\\s+|[^\\w\'\\s<>/]+') - - def __init__(self, language_code='en', stemmer=None): - """Create a new highlighter for the specified language. - - """ - if stemmer is not None: - self.stem = stemmer - else: - self.stem = xapian.Stem(language_code) - - def _split_text(self, text, strip_tags=False): - """Split some text into words and non-words. - - - `text` is the text to process. It may be a unicode object or a utf-8 - encoded simple string. - - `strip_tags` is a flag - False to keep tags, True to strip all tags - from the output. - - Returns a list of utf-8 encoded simple strings. - - """ - if isinstance(text, unicode): - text = text.encode('utf-8') - - words = self._split_re.findall(text) - if strip_tags: - return [w for w in words if w[0] != '<'] - else: - return words - - def _strip_prefix(self, term): - """Strip the prefix off a term. - - Prefixes are any initial capital letters, with the exception that R always - ends a prefix, even if followed by capital letters. - - >>> hl = Highlighter("en") - >>> print hl._strip_prefix('hello') - hello - >>> print hl._strip_prefix('Rhello') - hello - >>> print hl._strip_prefix('XARHello') - Hello - >>> print hl._strip_prefix('XAhello') - hello - >>> print hl._strip_prefix('XAh') - h - >>> print hl._strip_prefix('XA') - <BLANKLINE> - - """ - for p in xrange(len(term)): - if term[p].islower(): - return term[p:] - elif term[p] == 'R': - return term[p+1:] - return '' - - def _query_to_stemmed_words(self, query): - """Convert a query to a list of stemmed words. - - - `query` is the query to parse: it may be xapian.Query object, or a - sequence of terms. - - """ - if isinstance(query, xapian.Query): - return [self._strip_prefix(t) for t in query] - else: - return [self.stem(q.lower()) for q in query] - - - def makeSample(self, text, query, maxlen=600, hl=None): - """Make a contextual summary from the supplied text. - - This basically works by splitting the text into phrases, counting the query - terms in each, and keeping those with the most. - - Any markup tags in the text will be stripped. - - `text` is the source text to summarise. - `query` is either a Xapian query object or a list of (unstemmed) term strings. - `maxlen` is the maximum length of the generated summary. - `hl` is a pair of strings to insert around highlighted terms, e.g. ('<b>', '</b>') - - """ - - words = self._split_text(text, True) - terms = self._query_to_stemmed_words(query) - - # build blocks delimited by puncuation, and count matching words in each block - # blocks[n] is a block [firstword, endword, charcount, termcount, selected] - blocks = [] - start = end = count = blockchars = 0 - - while end < len(words): - blockchars += len(words[end]) - if words[end].isalnum(): - if self.stem(words[end].lower()) in terms: - count += 1 - end += 1 - elif words[end] in ',.;:?!\n': - end += 1 - blocks.append([start, end, blockchars, count, False]) - start = end - blockchars = 0 - count = 0 - else: - end += 1 - if start != end: - blocks.append([start, end, blockchars, count, False]) - if len(blocks) == 0: - return '' - - # select high-scoring blocks first, down to zero-scoring - chars = 0 - for count in xrange(3, -1, -1): - for b in blocks: - if b[3] >= count: - b[4] = True - chars += b[2] - if chars >= maxlen: break - if chars >= maxlen: break - - # assemble summary - words2 = [] - lastblock = -1 - for i, b in enumerate(blocks): - if b[4]: - if i != lastblock + 1: - words2.append('..') - words2.extend(words[b[0]:b[1]]) - lastblock = i - - if not blocks[-1][4]: - words2.append('..') - - # trim down to maxlen - l = 0 - for i in xrange (len (words2)): - l += len (words2[i]) - if l >= maxlen: - words2[i:] = ['..'] - break - - if hl is None: - return ''.join(words2) - else: - return self._hl(words2, terms, hl) - - def highlight(self, text, query, hl, strip_tags=False): - """Add highlights (string prefix/postfix) to a string. - - `text` is the source to highlight. - `query` is either a Xapian query object or a list of (unstemmed) term strings. - `hl` is a pair of highlight strings, e.g. ('<i>', '</i>') - `strip_tags` strips HTML markout iff True - - >>> hl = Highlighter() - >>> qp = xapian.QueryParser() - >>> q = qp.parse_query('cat dog') - >>> tags = ('[[', ']]') - >>> hl.highlight('The cat went Dogging; but was <i>dog tired</i>.', q, tags) - 'The [[cat]] went [[Dogging]]; but was <i>[[dog]] tired</i>.' - - """ - words = self._split_text(text, strip_tags) - terms = self._query_to_stemmed_words(query) - return self._hl(words, terms, hl) - - def _hl(self, words, terms, hl): - """Add highlights to a list of words. - - `words` is the list of words and non-words to be highlighted.. - `terms` is the list of stemmed words to look for. - - """ - for i, w in enumerate(words): - if self.stem(words[i].lower()) in terms: - words[i] = ''.join((hl[0], w, hl[1])) - - return ''.join(words) - - -__test__ = { - 'no_punc': r''' - - Test the highlighter's behaviour when there is no punctuation in the sample - text (regression test - used to return no output): - >>> hl = Highlighter("en") - >>> hl.makeSample('Hello world', ['world']) - 'Hello world' - - ''', - - 'stem_levels': r''' - - Test highlighting of words, and how it works with stemming: - >>> hl = Highlighter("en") - - # "word" and "wording" stem to "word", so the following 4 calls all return - # the same thing - >>> hl.makeSample('Hello. word. wording. wordinging.', ['word'], hl='<>') - 'Hello. <word>. <wording>. wordinging.' - >>> hl.highlight('Hello. word. wording. wordinging.', ['word'], '<>') - 'Hello. <word>. <wording>. wordinging.' - >>> hl.makeSample('Hello. word. wording. wordinging.', ['wording'], hl='<>') - 'Hello. <word>. <wording>. wordinging.' - >>> hl.highlight('Hello. word. wording. wordinging.', ['wording'], '<>') - 'Hello. <word>. <wording>. wordinging.' - - # "wordinging" stems to "wording", so only the last word is highlighted for - # this one. - >>> hl.makeSample('Hello. word. wording. wordinging.', ['wordinging'], hl='<>') - 'Hello. word. wording. <wordinging>.' - >>> hl.highlight('Hello. word. wording. wordinging.', ['wordinging'], '<>') - 'Hello. word. wording. <wordinging>.' - ''', - - 'supplied_stemmer': r''' - - Test behaviour if we pass in our own stemmer: - >>> stem = xapian.Stem('en') - >>> hl = Highlighter(stemmer=stem) - >>> hl.highlight('Hello. word. wording. wordinging.', ['word'], '<>') - 'Hello. <word>. <wording>. wordinging.' - - ''', - - 'unicode': r''' - - Test behaviour if we pass in unicode input: - >>> hl = Highlighter('en') - >>> hl.highlight(u'Hello\xf3. word. wording. wordinging.', ['word'], '<>') - 'Hello\xc3\xb3. <word>. <wording>. wordinging.' - - ''', - - 'no_sample': r''' - - Test behaviour if we pass in unicode input: - >>> hl = Highlighter('en') - >>> hl.makeSample(u'', ['word']) - '' - - ''', - - 'short_samples': r''' - - >>> hl = Highlighter('en') - >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['hello'], 20, ('<', '>')) - '.. <Hello> world ..' - >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['hello'], 40, ('<', '>')) - 'A boring start. <Hello> world indeed...' - >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['boring'], 40, ('<', '>')) - 'A <boring> start... A <boring> end.' - - ''', - - 'apostrophes': r''' - - >>> hl = Highlighter('en') - >>> hl.makeSample("A boring start. Hello world's indeed. A boring end.", ['world'], 40, ('<', '>')) - "A boring start. Hello <world's> indeed..." - - ''', - -} - -if __name__ == '__main__': - import doctest, sys - doctest.testmod (sys.modules[__name__]) diff --git a/secore/indexerconnection.py b/secore/indexerconnection.py deleted file mode 100644 index 84ea9a1..0000000 --- a/secore/indexerconnection.py +++ /dev/null @@ -1,382 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""indexerconnection.py: A connection to the search engine for indexing. - -""" -__docformat__ = "restructuredtext en" - -import xapian as _xapian -from datastructures import * -from fieldactions import * -import fieldmappings as _fieldmappings -import errors as _errors -import os as _os -import cPickle as _cPickle - -class IndexerConnection(object): - """A connection to the search engine for indexing. - - """ - - def __init__(self, indexpath): - """Create a new connection to the index. - - There may only be one indexer connection for a particular database open - at a given time. Therefore, if a connection to the database is already - open, this will raise a xapian.DatabaseLockError. - - If the database doesn't already exist, it will be created. - - """ - self._index = _xapian.WritableDatabase(indexpath, _xapian.DB_CREATE_OR_OPEN) - self._indexpath = indexpath - - # Read existing actions. - self._field_actions = {} - self._field_mappings = _fieldmappings.FieldMappings() - self._next_docid = 0 - self._config_modified = False - self._load_config() - - def _store_config(self): - """Store the configuration for the database. - - Currently, this stores the configuration in a file in the database - directory, so changes to it are not protected by transactions. When - support is available in xapian for storing metadata associated with - databases. this will be used instead of a file. - - """ - config_str = _cPickle.dumps(( - self._field_actions, - self._field_mappings.serialise(), - self._next_docid, - ), 2) - config_file = _os.path.join(self._indexpath, 'config') - fd = open(config_file, "w") - fd.write(config_str) - fd.close() - self._config_modified = False - - def _load_config(self): - """Load the configuration for the database. - - """ - config_file = _os.path.join(self._indexpath, 'config') - if not _os.path.exists(config_file): - return - fd = open(config_file) - config_str = fd.read() - if not config_str: - return - fd.close() - - (self._field_actions, mappings, self._next_docid) = _cPickle.loads(config_str) - self._field_mappings = _fieldmappings.FieldMappings(mappings) - self._config_modified = False - - def _allocate_id(self): - """Allocate a new ID. - - """ - while True: - idstr = "%x" % self._next_docid - self._next_docid += 1 - if not self._index.term_exists('Q' + idstr): - break - self._config_modified = True - return idstr - - def add_field_action(self, fieldname, fieldtype, **kwargs): - """Add an action to be performed on a field. - - Note that this change to the configuration will not be preserved on - disk until the next call to flush(). - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - if fieldname in self._field_actions: - actions = self._field_actions[fieldname] - else: - actions = FieldActions(fieldname) - self._field_actions[fieldname] = actions - actions.add(self._field_mappings, fieldtype, **kwargs) - self._config_modified = True - - def clear_field_actions(self, fieldname): - """Clear all actions for the specified field. - - This does not report an error if there are already no actions for the - specified field. - - Note that this change to the configuration will not be preserved on - disk until the next call to flush(). - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - if fieldname in self._field_actions: - del self._field_actions[fieldname] - self._config_modified = True - - def process(self, document): - """Process an UnprocessedDocument with the settings in this database. - - The resulting ProcessedDocument is returned. - - Note that this processing will be automatically performed if an - UnprocessedDocument is supplied to the add() or replace() methods of - IndexerConnection. This method is exposed to allow the processing to - be performed separately, which may be desirable if you wish to manually - modify the processed document before adding it to the database, or if - you want to split processing of documents from adding documents to the - database for performance reasons. - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - result = ProcessedDocument(self._field_mappings) - result.id = document.id - context = ActionContext(self._index) - - for field in document.fields: - try: - actions = self._field_actions[field.name] - except KeyError: - # If no actions are defined, just ignore the field. - continue - actions.perform(result, field.value, context) - - return result - - def add(self, document): - """Add a new document to the search engine index. - - If the document has a id set, and the id already exists in - the database, an exception will be raised. Use the replace() method - instead if you wish to overwrite documents. - - Returns the id of the newly added document (making up a new - unique ID if no id was set). - - The supplied document may be an instance of UnprocessedDocument, or an - instance of ProcessedDocument. - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - if not hasattr(document, '_doc'): - # It's not a processed document. - document = self.process(document) - - # Ensure that we have a id - orig_id = document.id - if orig_id is None: - id = self._allocate_id() - document.id = id - else: - id = orig_id - if self._index.term_exists('Q' + id): - raise _errors.IndexerError("Document ID of document supplied to add() is not unique.") - - # Add the document. - xapdoc = document.prepare() - self._index.add_document(xapdoc) - - if id is not orig_id: - document.id = orig_id - return id - - def replace(self, document): - """Replace a document in the search engine index. - - If the document does not have a id set, an exception will be - raised. - - If the document has a id set, and the id does not already - exist in the database, this method will have the same effect as add(). - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - if not hasattr(document, '_doc'): - # It's not a processed document. - document = self.process(document) - - # Ensure that we have a id - id = document.id - if id is None: - raise _errors.IndexerError("No document ID set for document supplied to replace().") - - xapdoc = document.prepare() - self._index.replace_document('Q' + id, xapdoc) - - def delete(self, id): - """Delete a document from the search engine index. - - If the id does not already exist in the database, this method - will have no effect (and will not report an error). - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - self._index.delete_document('Q' + id) - - def flush(self): - """Apply recent changes to the database. - - If an exception occurs, any changes since the last call to flush() may - be lost. - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - if self._config_modified: - self._store_config() - self._index.flush() - - def close(self): - """Close the connection to the database. - - It is important to call this method before allowing the class to be - garbage collected, because it will ensure that any un-flushed changes - will be flushed. It also ensures that the connection is cleaned up - promptly. - - No other methods may be called on the connection after this has been - called. (It is permissible to call close() multiple times, but - only the first call will have any effect.) - - If an exception occurs, the database will be closed, but changes since - the last call to flush may be lost. - - """ - if self._index is None: - return - try: - self.flush() - finally: - # There is currently no "close()" method for xapian databases, so - # we have to rely on the garbage collector. Since we never copy - # the _index property out of this class, there should be no cycles, - # so the standard python implementation should garbage collect - # _index straight away. A close() method is planned to be added to - # xapian at some point - when it is, we should call it here to make - # the code more robust. - self._index = None - self._indexpath = None - self._field_actions = None - self._config_modified = False - - def get_doccount(self): - """Count the number of documents in the database. - - This count will include documents which have been added or removed but - not yet flushed(). - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - return self._index.get_doccount() - - def iterids(self): - """Get an iterator which returns all the ids in the database. - - The unqiue_ids are currently returned in binary lexicographical sort - order, but this should not be relied on. - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - return PrefixedTermIter('Q', self._index.allterms()) - - def get_document(self, id): - """Get the document with the specified unique ID. - - Raises a KeyError if there is no such document. Otherwise, it returns - a ProcessedDocument. - - """ - if self._index is None: - raise _errors.IndexerError("IndexerConnection has been closed") - postlist = self._index.postlist('Q' + id) - try: - plitem = postlist.next() - except StopIteration: - # Unique ID not found - raise KeyError('Unique ID %r not found' % id) - try: - postlist.next() - raise _errors.IndexerError("Multiple documents " #pragma: no cover - "found with same unique ID") - except StopIteration: - # Only one instance of the unique ID found, as it should be. - pass - - result = ProcessedDocument(self._field_mappings) - result.id = id - result._doc = self._index.get_document(plitem.docid) - return result - -class PrefixedTermIter(object): - """Iterate through all the terms with a given prefix. - - """ - def __init__(self, prefix, termiter): - """Initialise the prefixed term iterator. - - - `prefix` is the prefix to return terms for. - - `termiter` is a xapian TermIterator, which should be at it's start. - - """ - - # The algorithm used in next() currently only works for single - # character prefixes, so assert that the prefix is single character. - # To deal with multicharacter prefixes, we need to check for terms - # which have a starting prefix equal to that given, but then have a - # following uppercase alphabetic character, indicating that the actual - # prefix is longer than the target prefix. We then need to skip over - # these. Not too hard to implement, but we don't need it yet. - assert(len(prefix) == 1) - - self._started = False - self._prefix = prefix - self._prefixlen = len(prefix) - self._termiter = termiter - - def __iter__(self): - return self - - def next(self): - """Get the next term with the specified prefix. - - - """ - if not self._started: - term = self._termiter.skip_to(self._prefix).term - self._started = True - else: - term = self._termiter.next().term - if len(term) < self._prefixlen or term[:self._prefixlen] != self._prefix: - raise StopIteration - return term[self._prefixlen:] - -if __name__ == '__main__': - import doctest, sys - doctest.testmod (sys.modules[__name__]) diff --git a/secore/marshall.py b/secore/marshall.py deleted file mode 100644 index ebcc71d..0000000 --- a/secore/marshall.py +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""marshall.py: Marshal values into strings - -""" -__docformat__ = "restructuredtext en" - -import math - -def _long_to_base256_array(value, length, flip): - result = [] - for i in xrange(length): - n = value % 256 - if flip: n = 255 - n - result.insert(0, chr(n)) - value /= 256 - return result - -def float_to_string(value): - """Marshall a floating point number to a string which sorts in the - appropriate manner. - - """ - mantissa, exponent = math.frexp(value) - sign = '1' - if mantissa < 0: - mantissa = -mantissa - sign = '0' - - # IEEE representation of doubles uses 11 bits for the exponent, with a bias - # of 1023. There's then another 52 bits in the mantissa, so we need to - # add 1075 to be sure that the exponent won't be negative. - # Even then, we check that the exponent isn't negative, and consider the - # value to be equal to zero if it is. - exponent += 1075 - if exponent < 0: # Note - this can't happen on most architectures #pragma: no cover - exponent = 0 - mantissa = 0 - elif mantissa == 0: - exponent = 0 - - # IEEE representation of doubles uses 52 bits for the mantissa. Convert it - # to a 7 character string, and convert the exponent to a 2 character - # string. - - mantissa = long(mantissa * (2**52)) - - digits = [sign] - digits.extend(_long_to_base256_array(exponent, 2, sign == '0')) - digits.extend(_long_to_base256_array(mantissa, 7, sign == '0')) - - return ''.join(digits) - -def date_to_string(date): - """Marshall a date to a string which sorts in the appropriate manner. - - """ - return '%04d%02d%02d' % (date.year, date.month, date.day) diff --git a/secore/parsedate.py b/secore/parsedate.py deleted file mode 100644 index 684d5f2..0000000 --- a/secore/parsedate.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""parsedate.py: Parse date strings. - -""" -__docformat__ = "restructuredtext en" - -import datetime -import re - -yyyymmdd_re = re.compile(r'(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})$') -yyyy_mm_dd_re = re.compile(r'(?P<year>[0-9]{4})([-/.])(?P<month>[0-9]{2})\2(?P<day>[0-9]{2})$') - -def date_from_string(value): - """Parse a string into a date. - - If the value supplied is already a date-like object (ie, has 'year', - 'month' and 'day' attributes), it is returned without processing. - - Supported date formats are: - - - YYYYMMDD - - YYYY-MM-DD - - YYYY/MM/DD - - YYYY.MM.DD - - """ - if (hasattr(value, 'year') - and hasattr(value, 'month') - and hasattr(value, 'day')): - return value - - mg = yyyymmdd_re.match(value) - if mg is None: - mg = yyyy_mm_dd_re.match(value) - - if mg is not None: - year, month, day = (int(i) for i in mg.group('year', 'month', 'day')) - return datetime.date(year, month, day) - - raise ValueError('Unrecognised date format') diff --git a/secore/searchconnection.py b/secore/searchconnection.py deleted file mode 100644 index 79fa509..0000000 --- a/secore/searchconnection.py +++ /dev/null @@ -1,618 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2007 Lemur Consulting Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -r"""searchconnection.py: A connection to the search engine for searching. - -""" -__docformat__ = "restructuredtext en" - -import xapian as _xapian -from datastructures import * -from fieldactions import * -import fieldmappings as _fieldmappings -import highlight as _highlight -import errors as _errors -import os as _os -import cPickle as _cPickle - -class SearchResult(ProcessedDocument): - """A result from a search. - - """ - def __init__(self, msetitem, results): - ProcessedDocument.__init__(self, results._fieldmappings, msetitem.document) - self.rank = msetitem.rank - self._results = results - - def _get_language(self, field): - """Get the language that should be used for a given field. - - """ - actions = self._results._conn._field_actions[field]._actions - for action, kwargslist in actions.iteritems(): - if action == FieldActions.INDEX_FREETEXT: - for kwargs in kwargslist: - try: - return kwargs['language'] - except KeyError: - pass - return 'none' - - def summarise(self, field, maxlen=600, hl=('<b>', '</b>')): - """Return a summarised version of the field specified. - - This will return a summary of the contents of the field stored in the - search result, with words which match the query highlighted. - - The maximum length of the summary (in characters) may be set using the - maxlen parameter. - - The return value will be a string holding the summary, with - highlighting applied. If there are multiple instances of the field in - the document, the instances will be joined with a newline character. - - To turn off highlighting, set hl to None. Each highlight will consist - of the first entry in the `hl` list being placed before the word, and - the second entry in the `hl` list being placed after the word. - - Any XML or HTML style markup tags in the field will be stripped before - the summarisation algorithm is applied. - - """ - highlighter = _highlight.Highlighter(language_code=self._get_language(field)) - field = self.data[field] - results = [] - text = '\n'.join(field) - return highlighter.makeSample(text, self._results._query, maxlen, hl) - - def highlight(self, field, hl=('<b>', '</b>'), strip_tags=False): - """Return a highlighted version of the field specified. - - This will return all the contents of the field stored in the search - result, with words which match the query highlighted. - - The return value will be a list of strings (corresponding to the list - of strings which is the raw field data). - - Each highlight will consist of the first entry in the `hl` list being - placed before the word, and the second entry in the `hl` list being - placed after the word. - - If `strip_tags` is True, any XML or HTML style markup tags in the field - will be stripped before highlighting is applied. - - """ - highlighter = _highlight.Highlighter(language_code=self._get_language(field)) - field = self.data[field] - results = [] - for text in field: - results.append(highlighter.highlight(text, self._results._query, hl, strip_tags)) - return results - - def __repr__(self): - return ('<SearchResult(rank=%d, id=%r, data=%r)>' % - (self.rank, self.id, self.data)) - - -class SearchResultIter(object): - """An iterator over a set of results from a search. - - """ - def __init__(self, results): - self._results = results - self._iter = iter(results._mset) - - def next(self): - msetitem = self._iter.next() - return SearchResult(msetitem, - self._results) - - -class SearchResults(object): - """A set of results of a search. - - """ - def __init__(self, conn, enq, query, mset, fieldmappings): - self._conn = conn - self._enq = enq - self._query = query - self._mset = mset - self._fieldmappings = fieldmappings - - def __repr__(self): - return ("<SearchResults(startrank=%d, " - "endrank=%d, " - "more_matches=%s, " - "matches_lower_bound=%d, " - "matches_upper_bound=%d, " - "matches_estimated=%d, " - "estimate_is_exact=%s)>" % - ( - self.startrank, - self.endrank, - self.more_matches, - self.matches_lower_bound, - self.matches_upper_bound, - self.matches_estimated, - self.estimate_is_exact, - )) - - def _get_more_matches(self): - # This check relies on us having asked for at least one more result - # than retrieved to be checked. - return (self.matches_lower_bound > self.endrank) - more_matches = property(_get_more_matches, doc= - """Check whether there are further matches after those in this result set. - - """) - def _get_startrank(self): - return self._mset.get_firstitem() - startrank = property(_get_startrank, doc= - """Get the rank of the first item in the search results. - - This corresponds to the "startrank" parameter passed to the search() method. - - """) - def _get_endrank(self): - return self._mset.get_firstitem() + len(self._mset) - endrank = property(_get_endrank, doc= - """Get the rank of the item after the end of the search results. - - If there are sufficient results in the index, this corresponds to the - "endrank" parameter passed to the search() method. - - """) - def _get_lower_bound(self): - return self._mset.get_matches_lower_bound() - matches_lower_bound = property(_get_lower_bound, doc= - """Get a lower bound on the total number of matching documents. - - """) - def _get_upper_bound(self): - return self._mset.get_matches_upper_bound() - matches_upper_bound = property(_get_upper_bound, doc= - """Get an upper bound on the total number of matching documents. - - """) - def _get_estimated(self): - return self._mset.get_matches_estimated() - matches_estimated = property(_get_estimated, doc= - """Get an estimate for the total number of matching documents. - - """) - def _estimate_is_exact(self): - return self._mset.get_matches_lower_bound() == \ - self._mset.get_matches_upper_bound() - estimate_is_exact = property(_estimate_is_exact, doc= - """Check whether the estimated number of matching documents is exact. - - If this returns true, the estimate given by the `matches_estimated` - property is guaranteed to be correct. - - If this returns false, it is possible that the actual number of matching - documents is different from the number given by the `matches_estimated` - property. - - """) - - def get_hit(self, index): - """Get the hit with a given index. - - """ - msetitem = self._mset.get_hit(index) - return SearchResult(msetitem, self) - __getitem__ = get_hit - - def __iter__(self): - """Get an iterator over the hits in the search result. - - The iterator returns the results in increasing order of rank. - - """ - return SearchResultIter(self) - -class SearchConnection(object): - """A connection to the search engine for searching. - - The connection will access a view of the database. - - """ - - def __init__(self, indexpath): - """Create a new connection to the index for searching. - - There may only an arbitrary number of search connections for a - particular database open at a given time (regardless of whether there - is a connection for indexing open as well). - - If the database doesn't exist, an exception will be raised. - - """ - self._index = _xapian.Database(indexpath) - self._indexpath = indexpath - - # Read the actions. - self._load_config() - - def _get_sort_type(self, field): - """Get the sort type that should be used for a given field. - - """ - actions = self._field_actions[field]._actions - for action, kwargslist in actions.iteritems(): - if action == FieldActions.SORT_AND_COLLAPSE: - for kwargs in kwargslist: - return kwargs['type'] - - def _load_config(self): - """Load the configuration for the database. - - """ - # Note: this code is basically duplicated in the IndexerConnection - # class. Move it to a shared location. - config_file = _os.path.join(self._indexpath, 'config') - if not _os.path.exists(config_file): - self._field_mappings = _fieldmappings.FieldMappings() - return - fd = open(config_file) - config_str = fd.read() - fd.close() - - (self._field_actions, mappings, next_docid) = _cPickle.loads(config_str) - self._field_mappings = _fieldmappings.FieldMappings(mappings) - - def reopen(self): - """Reopen the connection. - - This updates the revision of the index which the connection references - to the latest flushed revision. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - self._index.reopen() - # Re-read the actions. - self._load_config() - - def close(self): - """Close the connection to the database. - - It is important to call this method before allowing the class to be - garbage collected to ensure that the connection is cleaned up promptly. - - No other methods may be called on the connection after this has been - called. (It is permissible to call close() multiple times, but - only the first call will have any effect.) - - If an exception occurs, the database will be closed, but changes since - the last call to flush may be lost. - - """ - if self._index is None: - return - # There is currently no "close()" method for xapian databases, so - # we have to rely on the garbage collector. Since we never copy - # the _index property out of this class, there should be no cycles, - # so the standard python implementation should garbage collect - # _index straight away. A close() method is planned to be added to - # xapian at some point - when it is, we should call it here to make - # the code more robust. - self._index = None - self._indexpath = None - self._field_actions = None - self._field_mappings = None - - def get_doccount(self): - """Count the number of documents in the database. - - This count will include documents which have been added or removed but - not yet flushed(). - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - return self._index.get_doccount() - - def get_document(self, id): - """Get the document with the specified unique ID. - - Raises a KeyError if there is no such document. Otherwise, it returns - a ProcessedDocument. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - postlist = self._index.postlist('Q' + id) - try: - plitem = postlist.next() - except StopIteration: - # Unique ID not found - raise KeyError('Unique ID %r not found' % id) - try: - postlist.next() - raise _errors.SearchError("Multiple documents " #pragma: no cover - "found with same unique ID") - except StopIteration: - # Only one instance of the unique ID found, as it should be. - pass - - result = ProcessedDocument(self._field_mappings) - result.id = id - result._doc = self._index.get_document(plitem.docid) - return result - - OP_AND = _xapian.Query.OP_AND - OP_OR = _xapian.Query.OP_OR - def query_composite(self, operator, queries): - """Build a composite query from a list of queries. - - The queries are combined with the supplied operator, which is either - SearchConnection.OP_AND or SearchConnection.OP_OR. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - return _xapian.Query(operator, list(queries)) - - def query_filter(self, query, filter): - """Filter a query with another query. - - Documents will only match the resulting query if they match both - queries, but will be weighted according to only the first query. - - - `query`: The query to filter. - - `filter`: The filter to apply to the query. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - if not isinstance(filter, _xapian.Query): - raise _errors.SearchError("Filter must be a Xapian Query object") - return _xapian.Query(_xapian.Query.OP_FILTER, query, filter) - - def query_range(self, field, begin, end): - """Create a query for a range search. - - This creates a query which matches only those documents which have a - field value in the specified range. - - Begin and end must be appropriate values for the field, according to - the 'type' parameter supplied to the SORTABLE action for the field. - - The begin and end values are both inclusive - any documents with a - value equal to begin or end will be returned (unless end is less than - begin, in which case no documents will be returned). - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - - sorttype = self._get_sort_type(field) - marshaller = SortableMarshaller(False) - fn = marshaller.get_marshall_function(field, sorttype) - begin = fn(field, begin) - end = fn(field, end) - - slot = self._field_mappings.get_slot(field) - return _xapian.Query(_xapian.Query.OP_VALUE_RANGE, slot, begin, end) - - def _prepare_queryparser(self, allow, deny, default_op): - """Prepare (and return) a query parser using the specified fields and - operator. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - if allow is not None and deny is not None: - raise _errors.SearchError("Cannot specify both `allow` and `deny`") - qp = _xapian.QueryParser() - qp.set_database(self._index) - qp.set_default_op(default_op) - - if allow is None: - allow = [key for key in self._field_actions] - if deny is not None: - allow = [key for key in allow if key not in deny] - - for field in allow: - actions = self._field_actions[field]._actions - for action, kwargslist in actions.iteritems(): - if action == FieldActions.INDEX_EXACT: - # FIXME - need patched version of xapian to add exact prefixes - #qp.add_exact_prefix(field, self._field_mappings.get_prefix(field)) - qp.add_prefix(field, self._field_mappings.get_prefix(field)) - if action == FieldActions.INDEX_FREETEXT: - qp.add_prefix(field, self._field_mappings.get_prefix(field)) - for kwargs in kwargslist: - try: - lang = kwargs['language'] - qp.set_stemmer(_xapian.Stem(lang)) - qp.set_stemming_strategy(qp.STEM_SOME) - except KeyError: - pass - return qp - - def query_parse(self, string, allow=None, deny=None, default_op=OP_AND): - """Parse a query string. - - This is intended for parsing queries entered by a user. If you wish to - combine structured queries, it is generally better to use the other - query building methods, such as `query_composite`. - - - `string`: The string to parse. - - `allow`: A list of fields to allow in the query. - - `deny`: A list of fields not to allow in the query. - - Only one of `allow` and `deny` may be specified. - - If any of the entries in `allow` or `deny` are not present in the - configuration for the database, an exception will be raised. - - Returns a Query object, which may be passed to the search() method, or - combined with other queries. - - """ - qp = self._prepare_queryparser(allow, deny, default_op) - try: - return qp.parse_query(string) - except _xapian.QueryParserError, e: - # If we got a parse error, retry without boolean operators (since - # these are the usual cause of the parse error). - return qp.parse_query(string, 0) - - def query_field(self, field, value, default_op=OP_AND): - """A query for a single field. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - try: - actions = self._field_actions[field]._actions - except KeyError: - actions = {} - - # need to check on field type, and stem / split as appropriate - for action, kwargslist in actions.iteritems(): - if action == FieldActions.INDEX_EXACT: - prefix = self._field_mappings.get_prefix(field) - if len(value) > 0: - chval = ord(value[0]) - if chval >= ord('A') and chval <= ord('Z'): - prefix = prefix + ':' - return _xapian.Query(prefix + value) - if action == FieldActions.INDEX_FREETEXT: - qp = _xapian.QueryParser() - qp.set_default_op(default_op) - prefix = self._field_mappings.get_prefix(field) - for kwargs in kwargslist: - try: - lang = kwargs['language'] - qp.set_stemmer(_xapian.Stem(lang)) - qp.set_stemming_strategy(qp.STEM_SOME) - except KeyError: - pass - return qp.parse_query(value, - qp.FLAG_PHRASE | qp.FLAG_BOOLEAN | qp.FLAG_LOVEHATE, - prefix) - - return _xapian.Query() - - def query_all(self): - """A query which matches all the documents in the database. - - """ - return _xapian.Query('') - - def spell_correct(self, string, allow=None, deny=None): - """Correct a query spelling. - - This returns a version of the query string with any misspelt words - corrected. - - - `allow`: A list of fields to allow in the query. - - `deny`: A list of fields not to allow in the query. - - Only one of `allow` and `deny` may be specified. - - If any of the entries in `allow` or `deny` are not present in the - configuration for the database, an exception will be raised. - - """ - qp = self._prepare_queryparser(allow, deny, self.OP_AND) - qp.parse_query(string, qp.FLAG_PHRASE|qp.FLAG_BOOLEAN|qp.FLAG_LOVEHATE|qp.FLAG_SPELLING_CORRECTION) - corrected = qp.get_corrected_query_string() - if len(corrected) == 0: - if isinstance(string, unicode): - # Encode as UTF-8 for consistency - this happens automatically - # to values passed to Xapian. - return string.encode('utf-8') - return string - return corrected - - def search(self, query, startrank, endrank, - checkatleast=0, sortby=None, collapse=None): - """Perform a search, for documents matching a query. - - - `query` is the query to perform. - - `startrank` is the rank of the start of the range of matching - documents to return (ie, the result with this rank will be returned). - ranks start at 0, which represents the "best" matching document. - - `endrank` is the rank at the end of the range of matching documents - to return. This is exclusive, so the result with this rank will not - be returned. - - `checkatleast` is the minimum number of results to check for: the - estimate of the total number of matches will always be exact if - the number of matches is less than `checkatleast`. - - `sortby` is the name of a field to sort by. It may be preceded by a - '+' or a '-' to indicate ascending or descending order - (respectively). If the first character is neither '+' or '-', the - sort will be in ascending order. - - `collapse` is the name of a field to collapse the result documents - on. If this is specified, there will be at most one result in the - result set for each value of the field. - - """ - if self._index is None: - raise _errors.SearchError("SearchConnection has been closed") - enq = _xapian.Enquire(self._index) - enq.set_query(query) - - if sortby is not None: - asc = True - if sortby[0] == '-': - asc = False - sortby = sortby[1:] - elif sortby[0] == '+': - sortby = sortby[1:] - - try: - slotnum = self._field_mappings.get_slot(sortby) - except KeyError: - raise _errors.SearchError("Field %r was not indexed for sorting" % sortby) - - # Note: we invert the "asc" parameter, because xapian treats - # "ascending" as meaning "higher values are better"; in other - # words, it considers "ascending" to mean return results in - # descending order. - enq.set_sort_by_value_then_relevance(slotnum, not asc) - - if collapse is not None: - try: - slotnum = self._field_mappings.get_slot(collapse) - except KeyError: - raise _errors.SearchError("Field %r was not indexed for collapsing" % collapse) - enq.set_collapse_key(slotnum) - - maxitems = max(endrank - startrank, 0) - # Always check for at least one more result, so we can report whether - # there are more matches. - checkatleast = max(checkatleast, endrank + 1) - - enq.set_docid_order(enq.DONT_CARE) - - # Repeat the search until we don't get a DatabaseModifiedError - while True: - try: - mset = enq.get_mset(startrank, maxitems, checkatleast) - break - except _xapian.DatabaseModifiedError, e: - self.reopen() - return SearchResults(self, enq, query, mset, self._field_mappings) - -if __name__ == '__main__': - import doctest, sys - doctest.testmod (sys.modules[__name__]) diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am index 480a5a7..062e9b0 100644 --- a/src/olpc/datastore/Makefile.am +++ b/src/olpc/datastore/Makefile.am @@ -1,11 +1,23 @@ datastoredir = $(pythondir)/olpc/datastore datastore_PYTHON = \ __init__.py \ - backingstore.py \ - bin_copy.py \ - converter.py \ datastore.py \ - model.py \ - xapianindex.py \ - utils.py \ - __version__.py + filestore.py \ + indexstore.py \ + layoutmanager.py \ + metadatastore.py \ + migration.py \ + optimizer.py + +AM_CPPFLAGS = \ + $(WARN_CFLAGS) \ + $(EXT_CFLAGS) \ + $(PYTHON_INCLUDES) + +AM_LDFLAGS = -module -avoid-version + +pkgpyexecdir = $(pythondir)/olpc/datastore +pkgpyexec_LTLIBRARIES = metadatareader.la + +metadatareader_la_SOURCES = \ + metadatareader.c diff --git a/src/olpc/datastore/__init__.py b/src/olpc/datastore/__init__.py index fd38d75..8b13789 100644 --- a/src/olpc/datastore/__init__.py +++ b/src/olpc/datastore/__init__.py @@ -1,5 +1 @@ -# datastore package -from olpc.datastore.datastore import DataStore, DS_LOG_CHANNEL - - diff --git a/src/olpc/datastore/__version__.py b/src/olpc/datastore/__version__.py deleted file mode 100644 index 98d1028..0000000 --- a/src/olpc/datastore/__version__.py +++ /dev/null @@ -1,15 +0,0 @@ -## -## src/olpc/datastore/__version__.py -- Version Information for unknown (syntax: Python) -## [automatically generated and maintained by GNU shtool] -## - -class version: - v_hex = 0x002200 - v_short = "0.2.0" - v_long = "0.2.0 (08-May-2007)" - v_tex = "This is unknown, Version 0.2.0 (08-May-2007)" - v_gnu = "unknown 0.2.0 (08-May-2007)" - v_web = "unknown/0.2.0" - v_sccs = "@(#)unknown 0.2.0 (08-May-2007)" - v_rcs = "$Id: unknown 0.2.0 (08-May-2007) $" - diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py deleted file mode 100644 index cd23680..0000000 --- a/src/olpc/datastore/backingstore.py +++ /dev/null @@ -1,987 +0,0 @@ -""" -olpc.datastore.backingstore -~~~~~~~~~~~~~~~~~~~~~~~~~~~ -management of stable storage for the datastore - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - -import cPickle as pickle -from datetime import datetime -import gnomevfs -import os -import re -import sha -import subprocess -import time -import threading -import errno -import shutil -import urllib -import traceback -import sys - -import dbus -import xapian -import gobject - -try: - import cjson - has_cjson = True -except ImportError: - import simplejson - has_cjson = False - -from olpc.datastore.xapianindex import IndexManager -from olpc.datastore import bin_copy -from olpc.datastore import utils -from olpc.datastore import model - -# changing this pattern impacts _targetFile -filename_attempt_pattern = re.compile('\(\d+\)$') - -import logging -DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' -logger = logging.getLogger(DS_LOG_CHANNEL) -#logger.setLevel(logging.DEBUG) - -class BackingStore(object): - """Backing stores manage stable storage. We abstract out the - management of file/blob storage through this class, as well as the - location of the backing store itself (it may be done via a network - connection for example). - - While the backingstore is responsible for implementing the - metadata interface no implementation is provided here. It is - assumed by that interface that all the features enumerated in - olpc.datastore.model are provided. - - """ - def __init__(self, uri, **kwargs): - """The kwargs are used to configure the backend so it can - provide its interface. See specific backends for details - """ - pass - - def __repr__(self): - return "<%s %s: %s %s>" % (self.__class__.__name__, self.id, - self.title, self.uri) - # Init phases - @staticmethod - def parse(uri): - """parse the uri into an actionable mount-point. - Returns True or False indicating if this backend handles a - given uri. - """ - return False - - def initialize_and_load(self): - """phase to check the state of the located mount point, this - method returns True (mount point is valid) or False (invalid - or uninitialized mount point). - - self.check() which must return a boolean should check if the - result of self.locate() is already a datastore and then - initialize/load it according to self.options. - - When True self.load() is invoked. - When False self.create() followed by self.load() is invoked. - """ - if self.check() is False: - self.initialize() - self.load() - - def check(self): - return False - - def load(self): - """load the index for a given mount-point, then initialize its - fulltext subsystem. This is the routine that will bootstrap - the indexmanager (though create() may have just created it) - """ - pass - - def initialize(self): - """Initialize a new mount point""" - pass - - # Informational - def descriptor(self): - """return a dict with atleast the following keys - 'id' -- the id used to refer explicitly to the mount point - 'title' -- Human readable identifier for the mountpoint - 'uri' -- The uri which triggered the mount - """ - pass - - @property - def id(self): return self.descriptor()['id'] - @property - def title(self): return self.descriptor()['title'] - - # Storage Translation - def localizedName(self, uid=None, content=None, target=None): - """taking any of uid, a content object, or a direct target - filename (which includes all of the relative components under a - store). Return the localized filename that should be used _within_ - the repository for the storage of this content object - """ - pass - -import time -class AsyncCopy: - CHUNK_SIZE=65536 - - def __init__(self, src, dest, completion): - self.src = src - self.dest = dest - self.completion = completion - self.src_fp = -1 - self.dest_fp = -1 - self.written = 0 - self.size = 0 - - def _cleanup(self): - os.close(self.src_fp) - os.close(self.dest_fp) - - def _copy_block(self, user_data=None): - try: - data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) - count = os.write(self.dest_fp, data) - self.written += len(data) - - # error writing data to file? - if count < len(data): - logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest)) - self._cleanup() - self.completion(RuntimeError("Error writing data to destination file")) - return False - - # FIXME: emit progress here - - # done? - if len(data) < AsyncCopy.CHUNK_SIZE: - logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart)) - self._cleanup() - self.completion(None, self.dest) - return False - except Exception, err: - logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err)) - self._cleanup() - self.completion(err) - return False - - return True - - def start(self): - self.src_fp = os.open(self.src, os.O_RDONLY) - self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644) - - stat = os.fstat(self.src_fp) - self.size = stat[6] - - logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size)) - - self.tstart = time.time() - sid = gobject.idle_add(self._copy_block) - -class FileBackingStore(BackingStore): - """ A backing store that directs maps the storage of content - objects to an available filesystem. - - - # not really true, the change would result in the older version - having the last content and the new version as well. The old one - is left in the newest start rather than start state. if that can work... - The semantics of interacting - with the repositories mean that the current copy _can_ be edited - in place. Its the actions create/update that create new revisions - in the datastore and hence new versions. - """ - STORE_NAME = "store" - INDEX_NAME = "index" - DESCRIPTOR_NAME = "metainfo" - - def __init__(self, uri, **kwargs): - """ FileSystemStore(path=<root of managed storage>) - """ - self.options = kwargs - self.local_indexmanager = self.options.get('local_indexmanager', True) - - self.uri = uri - self.base = os.path.join(uri, self.STORE_NAME) - self.indexmanager = None - - """ Current uid of the user that is calling DataStore.get_filename - through dbus. Needed for security stuff. It is an instance variable - instead of a method parameter because this is less invasive for Update 1. - """ - self.current_user_id = None - - # source for an idle callback that exports to the file system the - # metadata from the index - self._export_metadata_source = None - - # Informational - def descriptor(self): - """return a dict with atleast the following keys - 'id' -- the id used to refer explicitly to the mount point - 'title' -- Human readable identifier for the mountpoint - 'uri' -- The uri which triggered the mount - """ - # a hidden file with a pickled dict will live in the base - # directory for each storage - fn = os.path.join(self.base, self.DESCRIPTOR_NAME) - desc = None - if os.path.exists(fn): - try: - fp = open(fn, 'r') - desc = pickle.load(fp) - fp.close() - except: - desc = None - if not desc: - # the data isn't there, this could happen for a number of - # reasons (the store isn't writeable) - # or if the information on it was corrupt - # in this case, just create a new one - desc = {'id' : self.uri, - 'uri' : self.uri, - 'title' : self.uri - } - self.create_descriptor(**desc) - - return desc - - - def create_descriptor(self, **kwargs): - # create the information descriptor for this store - # defaults will be created if need be - # passing limited values will leave existing keys in place - kwargs = utils._convert(kwargs) - fn = os.path.join(self.base, self.DESCRIPTOR_NAME) - desc = {} - if os.path.exists(fn): - fp = open(fn, 'r') - try: - desc = pickle.load(fp) - except: - desc = {} - finally: - fp.close() - - desc.update(kwargs) - - if 'id' not in desc: desc['id'] = utils.create_uid() - if 'uri' not in desc: desc['uri'] = self.uri - if 'title' not in desc: desc['title'] = self.uri - - # TODO: Would be better to check if the device is present and - # don't try to update the descriptor file if it's not. - try: - fp = open(fn, 'w') - pickle.dump(desc, fp) - fp.close() - except IOError, e: - logging.error('Unable to write descriptor:\n' + \ - ''.join(traceback.format_exception(*sys.exc_info()))) - - @staticmethod - def parse(uri): - return os.path.isabs(uri) or os.path.isdir(uri) - - def check(self): - if not os.path.exists(self.uri): return False - if not os.path.exists(self.base): return False - return True - - def initialize(self): - if not os.path.exists(self.base): - os.makedirs(self.base) - - # examine options and see what the indexmanager plan is - if self.local_indexmanager: - # create a local storage using the indexmanager - # otherwise we will connect the global manager - # in load - index_name = os.path.join(self.base, self.INDEX_NAME) - options = utils.options_for(self.options, 'indexmanager.') - im = IndexManager() - # This will ensure the fulltext and so on are all assigned - im.bind_to(self) - im.connect(index_name, **options) - - self.create_descriptor(**options) - self.indexmanager = im - - def load(self): - if not self.indexmanager and self.local_indexmanager: - # create a local storage using the indexmanager - # otherwise we will connect the global manager - # in load - index_name = os.path.join(self.base, self.INDEX_NAME) - options = utils.options_for(self.options, 'indexmanager.') - im = IndexManager() - - desc = utils.options_for(self.options, - 'indexmanager.', - invert=True) - if desc: self.create_descriptor(**desc) - - # This will ensure the fulltext and so on are all assigned - im.bind_to(self) - im.connect(index_name) - - self.indexmanager = im - - # Check that all entries have their metadata in the file system. - if not os.path.exists(os.path.join(self.base, '.metadata.exported')): - uids_to_export = [] - uids = self.indexmanager.get_all_ids() - - for uid in uids: - if not os.path.exists(os.path.join(self.base, uid + '.metadata')): - uids_to_export.append(uid) - - if uids_to_export: - self._export_metadata_source = gobject.idle_add( - self._export_metadata, uids_to_export) - else: - open(os.path.join(self.base, '.metadata.exported'), 'w').close() - - def _export_metadata(self, uids_to_export): - uid = uids_to_export.pop() - props = self.indexmanager.get(uid).properties - self._store_metadata(uid, props) - return len(uids_to_export) > 0 - - def bind_to(self, datastore): - ## signal from datastore that we are being bound to it - self.datastore = datastore - - def localizedName(self, uid=None, content=None, target=None): - """taking any of uid, a content object, or a direct target - filename (which includes all of the relative components under a - store). Return the localized filename that should be used _within_ - the repository for the storage of this content object - """ - if target: return os.path.join(self.base, target) - elif content: - # see if it expects a filename - fn, ext = content.suggestName() - if fn: return os.path.join(self.base, fn) - if ext: return os.path.join(self.base, "%s.%s" % - (content.id, ext)) - if not uid: uid = content.id - - if uid: - return os.path.join(self.base, uid) - else: - raise ValueError("""Nothing submitted to generate internal - storage name from""") - - def _translatePath(self, uid): - """translate a UID to a path name""" - # paths into the datastore - return os.path.join(self.base, str(uid)) - - def _targetFile(self, uid, target=None, ext=None, env=None): - logging.debug('FileBackingStore._targetFile: %r %r %r %r' % (uid, target, ext, env)) - # paths out of the datastore, working copy targets - path = self._translatePath(uid) - if not os.path.exists(path): - return None - - if target: targetpath = target - else: - targetpath = uid.replace('/', '_').replace('.', '__') - if ext: - if not ext.startswith('.'): ext = ".%s" % ext - targetpath = "%s%s" % (targetpath, ext) - - use_instance_dir = os.path.exists('/etc/olpc-security') and \ - os.getuid() != self.current_user_id - if use_instance_dir: - if not self.current_user_id: - raise ValueError("Couldn't determine the current user uid.") - base = os.path.join(os.environ['HOME'], 'isolation', '1', 'uid_to_instance_dir', - str(self.current_user_id)) - else: - profile = os.environ.get('SUGAR_PROFILE', 'default') - base = os.path.join(os.path.expanduser('~'), '.sugar', profile, 'data') - if not os.path.exists(base): - os.makedirs(base) - - targetpath = os.path.join(base, targetpath) - attempt = 0 - while os.path.exists(targetpath): - # here we look for a non-colliding name - # this is potentially a race and so we abort after a few - # attempts - targetpath, ext = os.path.splitext(targetpath) - - if filename_attempt_pattern.search(targetpath): - targetpath = filename_attempt_pattern.sub('', targetpath) - - attempt += 1 - if attempt > 9: - targetpath = "%s(%s).%s" % (targetpath, time.time(), ext) - break - - targetpath = "%s(%s)%s" % (targetpath, attempt, ext) - - # Try to make the original file readable. This can fail if the file is - # in FAT filesystem. - try: - os.chmod(path, 0604) - except OSError, e: - if e.errno != errno.EPERM: - raise - - # Try to hard link from the original file to the targetpath. This can - # fail if the file is in a different filesystem. Do a symlink instead. - try: - os.link(path, targetpath) - except OSError, e: - if e.errno == errno.EXDEV: - os.symlink(path, targetpath) - else: - raise - - return open(targetpath, 'r') - - def _mapContent(self, uid, fp, path, env=None): - """map a content object and the file in the repository to a - working copy. - """ - # env would contain things like cwd if we wanted to map to a - # known space - - content = self.indexmanager.get(uid) - # we need to map a copy of the content from the backingstore into the - # activities addressable space. - # map this to a rw file - if fp: - target, ext = content.suggestName() - targetfile = self._targetFile(uid, target, ext, env) - content.file = targetfile - - if self.options.get('verify', False): - c = sha.sha() - for line in targetfile: - c.update(line) - fp.seek(0) - if c.hexdigest() != content.checksum: - raise ValueError("Content for %s corrupt" % uid) - return content - - def _writeContent_complete(self, path, completion=None): - self._set_permissions_if_possible(path) - if completion is None: - return path - completion(None, path) - return None - - def _set_permissions_if_possible(self, path): - try: - os.chmod(path, 0604) - except OSError, e: - # This can fail for usb sticks. - if e.errno != errno.EPERM: - raise - - def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None, - completion=None): - """Returns: path of file in datastore (new path if it was copied/moved)""" - content = None - if target: path = target - else: - path = self._translatePath(uid) - - if replace is False and os.path.exists(path): - raise KeyError("objects with path:%s for uid:%s exists" %( - path, uid)) - - if filelike.name != path: - # protection on inplace stores - if completion is None: - bin_copy.bin_copy(filelike.name, path) - self._set_permissions_if_possible(path) - return path - - if can_move: - bin_copy.bin_mv(filelike.name, path) - self._set_permissions_if_possible(path) - return self._writeContent_complete(path, completion) - - # Otherwise, async copy - aco = AsyncCopy(filelike.name, path, completion) - aco.start() - else: - return self._writeContent_complete(path, completion) - - def _checksum(self, filename): - c = sha.sha() - fp = open(filename, 'r') - for line in fp: - c.update(line) - fp.close() - return c.hexdigest() - - # File Management API - def _encode_json(self, metadata, file_path): - if has_cjson: - f = open(file_path, 'w') - f.write(cjson.encode(metadata)) - f.close() - else: - simplejson.dump(metadata, open(file_path, 'w')) - - def _store_metadata(self, uid, props): - t = time.time() - temp_path = os.path.join(self.base, '.temp_metadata') - props = props.copy() - for property_name in model.defaultModel.get_external_properties(): - if property_name in props: - del props[property_name] - self._encode_json(props, temp_path) - path = os.path.join(self.base, uid + '.metadata') - os.rename(temp_path, path) - logging.debug('exported metadata: %r s.' % (time.time() - t)) - - def _delete_metadata(self, uid): - path = os.path.join(self.base, uid + '.metadata') - if os.path.exists(path): - os.unlink(path) - - def _create_completion(self, uid, props, completion, exc=None, path=None): - if exc: - completion(exc) - return - try: - # Index the content this time - self.indexmanager.index(props, path) - completion(None, uid) - except Exception, exc: - completion(exc) - - def create_async(self, props, filelike, can_move=False, completion=None): - if completion is None: - raise RuntimeError("Completion must be valid for async create") - uid = self.indexmanager.index(props) - self._store_metadata(uid, props) - props['uid'] = uid - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - self._writeContent(uid, filelike, replace=False, can_move=can_move, - completion=lambda *args: self._create_completion(uid, props, completion, *args)) - else: - completion(None, uid) - - def create(self, props, filelike, can_move=False): - if filelike: - uid = self.indexmanager.index(props) - self._store_metadata(uid, props) - props['uid'] = uid - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - path = self._writeContent(uid, filelike, replace=False, can_move=can_move) - self.indexmanager.index(props, path) - return uid - else: - uid = self.indexmanager.index(props) - self._store_metadata(uid, props) - return uid - - def get(self, uid, env=None, allowMissing=False, includeFile=False): - content = self.indexmanager.get(uid) - if not content: raise KeyError(uid) - path = self._translatePath(uid) - fp = None - # not all content objects have a file - if includeFile and os.path.exists(path): - fp = open(path, 'r') - # now return a Content object from the model associated with - # this file object - content = self._mapContent(uid, fp, path, env) - if fp: - fp.close() - return content - - def _update_completion(self, uid, props, completion, exc=None, path=None): - if exc is not None: - completion(exc) - return - try: - self.indexmanager.index(props, path) - completion() - except Exception, exc: - completion(exc) - - def update_async(self, uid, props, filelike, can_move=False, completion=None): - logging.debug('backingstore.update_async') - if filelike is None: - raise RuntimeError("Filelike must be valid for async update") - if completion is None: - raise RuntimeError("Completion must be valid for async update") - - props['uid'] = uid - self._store_metadata(uid, props) - if filelike: - uid = self.indexmanager.index(props, filelike) - props['uid'] = uid - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - self._writeContent(uid, filelike, can_move=can_move, - completion=lambda *args: self._update_completion(uid, props, completion, *args)) - else: - self.indexmanager.index(props) - completion() - - def update(self, uid, props, filelike=None, can_move=False): - props['uid'] = uid - self._store_metadata(uid, props) - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - path = self._writeContent(uid, filelike, can_move=can_move) - self.indexmanager.index(props, path) - else: - self.indexmanager.index(props) - - def _delete_external_properties(self, uid): - external_properties = model.defaultModel.get_external_properties() - for property_name in external_properties: - file_path = os.path.join(self.base, property_name, uid) - if os.path.exists(file_path): - logging.debug('deleting external property: %r' % file_path) - os.unlink(file_path) - - def delete(self, uid, allowMissing=True): - self._delete_external_properties(uid) - self._delete_metadata(uid) - - self.indexmanager.delete(uid) - path = self._translatePath(uid) - if os.path.exists(path): - os.unlink(path) - else: - if not allowMissing: - raise KeyError("object for uid:%s missing" % uid) - - def get_uniquevaluesfor(self, propertyname): - return self.indexmanager.get_uniquevaluesfor(propertyname) - - - def get_external_property(self, doc_id, key): - # external properties default to the following storage - # <repo>/key/uid which is the file containing the external - # data. its contents is returned by this call - # when missing or absent '' is returned - pfile = os.path.join(self.base, key, str(doc_id)) - if os.path.exists(pfile): v = open(pfile, 'r').read() - else: v = '' - return dbus.ByteArray(v) - - - def set_external_property(self, doc_id, key, value): - pdir = os.path.join(self.base, key) - if not os.path.exists(pdir): os.mkdir(pdir) - pfile = os.path.join(pdir, doc_id) - fp = open(pfile, 'w') - fp.write(value) - fp.close() - - - def find(self, query, order_by=None, limit=None, offset=0): - if not limit: limit = 4069 - return self.indexmanager.search(query, start_index=offset, end_index=limit, order_by=order_by) - - def ids(self): - return self.indexmanager.get_all_ids() - - def stop(self): - if self._export_metadata_source is not None: - gobject.source_remove(self._export_metadata_source) - self.indexmanager.stop() - - def complete_indexing(self): - self.indexmanager.complete_indexing() - -class InplaceFileBackingStore(FileBackingStore): - """Like the normal FileBackingStore this Backingstore manages the - storage of files, but doesn't move files into a repository. There - are no working copies. It simply adds index data through its - indexmanager and provides fulltext ontop of a regular - filesystem. It does record its metadata relative to this mount - point. - - This is intended for USB keys and related types of attachable - storage. - """ - - STORE_NAME = ".olpc.store" - - def __init__(self, uri, **kwargs): - # remove the 'inplace:' scheme - uri = uri[len('inplace:'):] - super(InplaceFileBackingStore, self).__init__(uri, **kwargs) - # use the original uri - self.uri = uri - self._walk_source = None - - @staticmethod - def parse(uri): - return uri.startswith("inplace:") - - def check(self): - if not os.path.exists(self.uri): return False - if not os.path.exists(self.base): return False - return True - - - def load(self): - try: - super(InplaceFileBackingStore, self).load() - except xapian.DatabaseCorruptError, e: - # TODO: Try to recover in a smarter way than deleting the base - # dir and reinitializing the index. - - logging.error('Error while trying to load mount point %s: %s. ' \ - 'Will try to renitialize and load again.' % (self.base, e)) - - # Delete the base dir and its contents - for root, dirs, files in os.walk(self.base, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - os.rmdir(root) - - self.initialize() - self.load() - return - - # now map/update the existing data into the indexes - # but do it async - files_to_check = [] - for dirpath, dirname, filenames in os.walk(self.uri): - if self.base in dirpath: continue - if self.STORE_NAME in dirname: - dirname.remove(self.STORE_NAME) - - # blacklist all the hidden directories - if '/.' in dirpath: continue - - for fn in filenames: - # ignore conventionally hidden files - if fn.startswith("."): - continue - files_to_check.append((dirpath, fn)) - - self._walk_source = gobject.idle_add(self._walk, files_to_check) - - def _walk(self, files_to_check): - dirpath, fn = files_to_check.pop() - logging.debug('InplaceFileBackingStore._walk(): %r' % fn) - try: - source = os.path.join(dirpath, fn) - relative = source[len(self.uri)+1:] - - result, count = self.indexmanager.search(dict(filename=relative)) - mime_type = gnomevfs.get_mime_type(source) - stat = os.stat(source) - ctime = datetime.fromtimestamp(stat.st_ctime).isoformat() - mtime = datetime.fromtimestamp(stat.st_mtime).isoformat() - title = os.path.splitext(os.path.split(source)[1])[0] - metadata = dict(filename=relative, - mime_type=mime_type, - ctime=ctime, - mtime=mtime, - title=title) - if not count: - # create a new record - self.create(metadata, source) - else: - # update the object with the new content iif the - # checksum is different - # XXX: what if there is more than one? (shouldn't - # happen) - - # FIXME This is throwing away all the entry metadata. - # Disabled for trial-3. We are not doing indexing - # anyway so it would just update the mtime which is - # not that useful. Also the journal is currently - # setting the mime type before saving the file making - # the mtime check useless. - # - # content = result.next() - # uid = content.id - # saved_mtime = content.get_property('mtime') - # if mtime != saved_mtime: - # self.update(uid, metadata, source) - pass - except Exception, e: - logging.exception('Error while processing %r: %r' % (fn, e)) - - if files_to_check: - return True - else: - self._walk_source = None - return False - - def _translatePath(self, uid): - try: content = self.indexmanager.get(uid) - except KeyError: return None - return os.path.join(self.uri, content.get_property('filename', uid)) - -## def _targetFile(self, uid, target=None, ext=None, env=None): -## # in this case the file should really be there unless it was -## # deleted in place or something which we typically isn't -## # allowed -## # XXX: catch this case and remove the index -## targetpath = self._translatePath(uid) -## return open(targetpath, 'rw') - - # File Management API - def create_async(self, props, filelike, completion, can_move=False): - """Inplace backing store doesn't copy, so no need for async""" - try: - uid = self.create(props, filelike, can_move) - completion(None, uid) - except Exception, exc: - completion(exc) - - def _get_unique_filename(self, suggested_filename): - # Invalid characters in VFAT filenames. From - # http://en.wikipedia.org/wiki/File_Allocation_Table - invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\x7F'] - invalid_chars.extend([chr(x) for x in range(0, 32)]) - filename = suggested_filename - for char in invalid_chars: - filename = filename.replace(char, '_') - - # FAT limit is 255, leave some space for uniqueness - max_len = 250 - if len(filename) > max_len: - name, extension = os.path.splitext(filename) - filename = name[0:max_len - extension] + extension - - if os.path.exists(os.path.join(self.uri, filename)): - i = 1 - while len(filename) <= max_len: - name, extension = os.path.splitext(filename) - filename = name + '_' + str(i) + extension - if not os.path.exists(os.path.join(self.uri, filename)): - break - i += 1 - - if len(filename) > max_len: - filename = None - - return filename - - def create(self, props, filelike, can_move=False): - # the file would have already been changed inplace - # don't touch it - proposed_name = None - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - # usually with USB drives and the like the file we are - # indexing is already on it, however in the case of moving - # files to these devices we need to detect this case and - # place the file - proposed_name = props.get('filename', None) - if proposed_name is None: - proposed_name = props.get('suggested_filename', None) - if proposed_name is None: - proposed_name = os.path.split(filelike.name)[1] - proposed_name = self._get_unique_filename(proposed_name) - - # record the name before qualifying it to the store - props['filename'] = proposed_name - proposed_name = os.path.join(self.uri, proposed_name) - - uid = self.indexmanager.index(props) - props['uid'] = uid - path = filelike - if proposed_name and not os.path.exists(proposed_name): - path = self._writeContent(uid, filelike, replace=False, target=proposed_name) - self.indexmanager.index(props, path) - return uid - - def get(self, uid, env=None, allowMissing=False): - content = self.indexmanager.get(uid) - if not content: raise KeyError(uid) - return content - - def update_async(self, uid, props, filelike, completion, can_move=False): - try: - self.update(uid, props, filelike, can_move) - completion() - except Exception, exc: - completion(exc) - - def update(self, uid, props, filelike=None, can_move=False): - # the file would have already been changed inplace - # don't touch it - props['uid'] = uid - - proposed_name = None - if filelike: - if isinstance(filelike, basestring): - # lets treat it as a filename - filelike = open(filelike, "r") - filelike.seek(0) - # usually with USB drives and the like the file we are - # indexing is already on it, however in the case of moving - # files to these devices we need to detect this case and - # place the file - proposed_name = props.get('filename', None) - if not proposed_name: - proposed_name = os.path.split(filelike.name)[1] - # record the name before qualifying it to the store - props['filename'] = proposed_name - proposed_name = os.path.join(self.uri, proposed_name) - - path = filelike - if proposed_name: - path = self._writeContent(uid, filelike, replace=True, target=proposed_name) - self.indexmanager.index(props, path) - - def delete(self, uid): - self._delete_external_properties(uid) - - c = self.indexmanager.get(uid) - path = c.get_property('filename', None) - self.indexmanager.delete(uid) - - if path: - path = os.path.join(self.uri, path) - if os.path.exists(path): - os.unlink(path) - - def stop(self): - if self._walk_source is not None: - gobject.source_remove(self._walk_source) - self.indexmanager.stop(force=True) - - def complete_indexing(self): - # TODO: Perhaps we should move the inplace indexing to be sync here? - self.indexmanager.complete_indexing() - diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py deleted file mode 100644 index 6cf7036..0000000 --- a/src/olpc/datastore/bin_copy.py +++ /dev/null @@ -1,27 +0,0 @@ -import os, subprocess - - -def bin_copy(src, dest): - try: - subprocess.check_call(['/bin/cp', src, dest]) - except subprocess.CalledProcessError: - raise OSError("Copy failed %s %s" % (src, dest)) - -def bin_mv(src, dest): - try: - subprocess.check_call(['/bin/mv', src, dest]) - except subprocess.CalledProcessError: - raise OSError("Move failed %s %s" % (src, dest)) - -if __name__ == "__main__": - import sys - if len(sys.argv) != 3: - raise SystemExit("usage: <src> <dest>") - - src, dest = sys.argv[1:] - - if not os.path.exists(src): raise OSError("missing src file") - - bin_copy(src, dest) - - diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py deleted file mode 100644 index 75f7568..0000000 --- a/src/olpc/datastore/converter.py +++ /dev/null @@ -1,165 +0,0 @@ -""" -olpc.datastore.converter -~~~~~~~~~~~~~~~~~~~~ -Convert binary formats to unicode text for indexing. - -Normally we'd make heavy reliance on 3rd party tools to do -conversion. In the olpc use-case we want to minimize such -dependencies. As such we make a minimal attempt to extract what text -we can. - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - -from olpc.datastore.utils import Singleton -import codecs -import logging -import os -import subprocess -import sys -import tempfile -import gnomevfs - -def guess_mimetype(filename): - fn = os.path.abspath(filename) - mimetype = gnomevfs.get_mime_type(fn) - return mimetype - -class subprocessconverter(object): - """Process a command. Collect the output - - commands will have the following variables available to them for - substitution. 'source' is required and is the input file. - 'target' is optional, but if its omitted the subprocessconverter - must supply an implict_target(source) method which returns the - name of the expected output. - - A file object opened for reading will be returned to be passed to - the indexer. - - %(source)s - %(target)s - - pdftotext %(source)s %s(target)s - """ - def __init__(self, cmd, find_target=None): - self.raw = cmd - self.require_target = False - self.find_target = find_target - - if '%(source)s' not in cmd: - raise ValueError("doesn't handle source") - if '%(target)s' not in cmd: - if not callable(find_target): - raise ValueError("no way of locating conversion target") - self.require_target = True - - def verify(self): - """should this converter be used?""" - return os.path.exists(self.raw.split()[0]) - - def __call__(self, filename): - data = {} - data['source'] = filename - if self.require_target: - # XXX: methods that return something bad here - # will result in the wrong thing being unlinked - target = data['target'] = self.find_target(filename) - else: - target = data['target'] = tempfile.mkstemp()[1] - cmd = self.raw % data - - try: - cmd = cmd.split() - # the stderr capture here will hide glib error messages - # from converters which shouldn't be generating output anyway - retcode = subprocess.call(cmd, stderr=subprocess.PIPE) - if retcode: return None - return codecs.open(target, 'r', 'utf-8') - except UnicodeDecodeError: - # The data was an unknown type but couldn't be understood - # as text so we don't attempt to index it. This most - # likely means its just an unknown binary format. - return None - finally: - # we unlink the file as its already been opened for - # reading - if os.path.exists(target): - os.unlink(target) - -class noop(object): - def verify(self): return True - def __call__(self, filename): - return codecs.open(filename, 'r', 'utf-8') - -class Converter(object): - __metaclass__ = Singleton - def __init__(self): - # maps both extension -> plugin - # and mimetype -> plugin - self._converters = {} - self._default = None - self.logger = logging.getLogger('org.laptop.sugar.Indexer') - - def registerConverter(self, ext_or_mime, plugin): - if plugin.verify(): - self._converters[ext_or_mime] = plugin - if self._default is None: self._default = plugin - - def __call__(self, filename, encoding=None, mimetype=None): - """Convert filename's content to utf-8 encoded text.""" - #encoding is passed its the known encoding of the - #contents. When None is passed the encoding is guessed which - #can result in unexpected or no output. - if mimetype: mt = mimetype - else: mt = guess_mimetype(filename) - maintype, subtype = mt.split('/',1) - - converter = self._converters.get(mt) - if not converter: - converter = self._default - # it was an image or an unknown application - if maintype in ['image', 'application', 'audio', 'video'] or \ - subtype in ['x-trash', 'x-python-bytecode',]: - converter = None - - if converter: - try: return converter(filename) - except: - logging.debug("Binary to Text failed: %s %s" % - (mt, filename), exc_info=sys.exc_info()) - - return None - -# our global instance -converter = Converter() - -# TXT -txt = noop() -converter.registerConverter('.txt', txt) -converter.registerConverter('.html', txt) -converter.registerConverter('text/plain', txt) -converter.registerConverter('text/html', txt) - -# PDF -pdf2txt = subprocessconverter('/usr/bin/pdftotext -nopgbrk -enc UTF-8 %(source)s %(target)s') -converter.registerConverter('.pdf', pdf2txt) -converter.registerConverter('application/pdf', pdf2txt) - - -# DOC -def find_by_ext(filename, ext="txt"): - return "%s.%s" % (os.path.splitext(filename)[0], ext) - -doctotext = subprocessconverter('/usr/bin/abiword -t txt %(source)s', find_by_ext) -converter.registerConverter('.doc', doctotext) -converter.registerConverter('application/msword', doctotext) - -# ODT -converter.registerConverter('.odt', doctotext) -converter.registerConverter('application/vnd.oasis.opendocument.text', doctotext) - diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index a15d5cf..8186960 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -1,22 +1,35 @@ -""" -olpc.datastore.datastore -~~~~~~~~~~~~~~~~~~~~~~~~ -the datastore facade - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - - +# Copyright (C) 2008, One Laptop Per Child +# Based on code Copyright (C) 2007, ObjectRealms, LLC +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import logging -import dbus.service -import dbus.mainloop.glib +import uuid +import time +import os -from olpc.datastore import utils +import dbus +import gobject + +from olpc.datastore import layoutmanager +from olpc.datastore import migration +from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT +from olpc.datastore.metadatastore import MetadataStore +from olpc.datastore.indexstore import IndexStore +from olpc.datastore.filestore import FileStore +from olpc.datastore.optimizer import Optimizer # the name used by the logger DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' @@ -27,468 +40,259 @@ DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" logger = logging.getLogger(DS_LOG_CHANNEL) -DEFAULT_LIMIT = 65536 - class DataStore(dbus.service.Object): - + """D-Bus API and logic for connecting all the other components. + """ def __init__(self, **options): - self.options = options - self.backends = [] - self.mountpoints = {} - self.root = None - - # global handle to the main look - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - session_bus = dbus.SessionBus() - - self.bus_name = dbus.service.BusName(DS_SERVICE, - bus=session_bus, - replace_existing=False, - allow_replacement=False) - dbus.service.Object.__init__(self, self.bus_name, DS_OBJECT_PATH) + bus_name = dbus.service.BusName(DS_SERVICE, + bus=dbus.SessionBus(), + replace_existing=False, + allow_replacement=False) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) - - #### - ## Backend API - ## register a set of datastore backend factories which will manage - ## storage - def registerBackend(self, backendClass): - self.backends.append(backendClass) - - ## MountPoint API - #@utils.sanitize_dbus - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="sa{sv}", - out_signature='s') - def mount(self, uri, options=None): - """(re)Mount a new backingstore for this datastore. - Returns the mountpoint id or an empty string to indicate failure. - """ - # on some media we don't want to write the indexes back to the - # medium (maybe an SD card for example) and we'd want to keep - # that on the XO itself. In these cases their might be very - # little identifying information on the media itself. - uri = str(uri) - - _options = utils._convert(options) - if _options is None: _options = {} - - mp = self.connect_backingstore(uri, **_options) - if not mp: return '' - if mp.id in self.mountpoints: - self.mountpoints[mp.id].stop() - - mp.bind_to(self) - self.mountpoints[mp.id] = mp - if self.root is None: - self.root = mp - - self.Mounted(mp.descriptor()) - return mp.id - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="", - out_signature="aa{sv}") - def mounts(self): - """return a list of mount point descriptiors where each - descriptor is a dict containing atleast the following keys: - 'id' -- the id used to refer explicitly to the mount point - 'title' -- Human readable identifier for the mountpoint - 'uri' -- The uri which triggered the mount - """ - return [mp.descriptor() for mp in self.mountpoints.itervalues()] - - #@utils.sanitize_dbus - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="s", - out_signature="") - def unmount(self, mountpoint_id): - """Unmount a mountpoint by id""" - if mountpoint_id not in self.mountpoints: return - mp = self.mountpoints[mountpoint_id] - mp.stop() - - del self.mountpoints[mountpoint_id] - self.Unmounted(mp.descriptor()) + layout_manager = layoutmanager.get_instance() + if layout_manager.get_version() == 0: + migration.migrate_from_0() + layout_manager.set_version(1) + layout_manager.index_updated = False - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Mounted(self, descriptior): - """indicates that a new backingstore has been mounted by the - datastore. Returns the mount descriptor, like mounts()""" - pass + self._metadata_store = MetadataStore() - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Unmounted(self, descriptor): - """indicates that a new backingstore has been mounted by the - datastore. Returns the mount descriptor, like mounts()""" - pass - - - ### End Mount Points - - ### Backup support - def pause(self, mountpoints=None): - """ Deprecated. """ - - def unpause(self, mountpoints=None): - """ Deprecated. """ - ### End Backups - - def connect_backingstore(self, uri, **kwargs): - """ - connect to a new backing store - - @returns: Boolean for success condition - """ - bs = None - for backend in self.backends: - if backend.parse(uri) is True: - bs = backend(uri, **kwargs) - bs.initialize_and_load() - # The backingstore should be ready to run - break - return bs - - - def _resolveMountpoint(self, mountpoint=None): - if isinstance(mountpoint, dict): - mountpoint = mountpoint.pop('mountpoint', None) - - if mountpoint is not None: - # this should be the id of a mount point - mp = self.mountpoints[mountpoint] + self._index_store = IndexStore() + try: + self._index_store.open_index() + except Exception, e: + logging.error('Failed to open index, will rebuild: %r', e) + layout_manager.index_updated = False + self._index_store.remove_index() + self._index_store.open_index() + + self._file_store = FileStore() + + if not layout_manager.index_updated: + logging.debug('Index is not up-to-date, will update') + self._rebuild_index() + + self._optimizer = Optimizer(self._file_store, self._metadata_store) + + def _rebuild_index(self): + uids = layoutmanager.get_instance().find_all() + logging.debug('Going to update the index with uids %r' % uids) + gobject.idle_add(lambda: self.__rebuild_index_cb(uids), + priority=gobject.PRIORITY_LOW) + + def __rebuild_index_cb(self, uids): + if uids: + uid = uids.pop() + + logging.debug('Updating entry %r in index. %d to go.' % \ + (uid, len(uids))) + + if not self._index_store.contains(uid): + props = self._metadata_store.retrieve(uid) + self._index_store.store(uid, props) + + if not uids: + logging.debug('Finished updating index.') + layoutmanager.get_instance().index_updated = True + return False else: - # the first one is the default - mp = self.root - return mp + return True - def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None): - logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid)) + def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \ + (async_cb, async_err_cb, uid, exc)) if exc is not None: async_err_cb(exc) return self.Created(uid) + self._optimizer.optimize(uid) logger.debug("created %s" % uid) async_cb(uid) - # PUBLIC API - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}sb', out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) - def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None): - """create a new entry in the datastore. If a file is passed it - will be consumed by the datastore. Because the repository has - a checkin/checkout model this will create a copy of the file - in the repository. Changes to this file will not automatically - be be saved. Rather it is recorded in its current state. - - When many backing stores are associated with a datastore - new objects are created in the first datastore. More control - over this process can come at a later time. - """ - mp = self._resolveMountpoint(props) - mp.create_async(props, filelike, can_move=transfer_ownership, - completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args)) + def create(self, props, file_path, transfer_ownership, + async_cb, async_err_cb): + uid = str(uuid.uuid4()) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Created(self, uid): pass - - def _single_search(self, mountpoint, query, order_by, limit, offset): - results, count = mountpoint.find(query.copy(), order_by, - limit + offset, offset) - return list(results), count, 1 - - def _multiway_search(self, query, order_by=None, limit=None, offset=None): - mountpoints = query.pop('mountpoints', self.mountpoints) - mountpoints = [self.mountpoints[str(m)] for m in mountpoints] - - if len(mountpoints) == 1: - # Fast path the single mountpoint case - return self._single_search(mountpoints[0], query, - order_by, limit, offset) - - results = [] - # XXX: the merge will become *much* more complex in when - # distributed versioning is implemented. - # collect - # some queries mutate the query-dict so we pass a copy each - # time - for mp in mountpoints: - result, count = mp.find(query.copy(), order_by, limit) - results.append(result) - - # merge - d = {} - for res in results: - for hit in res: - existing = d.get(hit.id) - if not existing or \ - existing.get_property('mtime') < hit.get_property('mtime'): - # XXX: age/version check - d[hit.id] = hit - return d, len(d), len(results) + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._create_completion_cb(async_cb, + async_err_cb, + uid, + *args)) - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='as') - def ids(self, mountpoint=None): - """return all the ids of objects living on a given - mountpoint""" - if str(mountpoint) == "": mountpoint=None - mp = self._resolveMountpoint(mountpoint) - return mp.ids() - - - #@utils.sanitize_dbus - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}as', - out_signature='aa{sv}u') - def find(self, query=None, properties=None, **kwargs): - """find(query) - takes a dict of parameters and returns data in the following - format + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Created(self, uid): + pass - (results, count) + def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \ + (async_cb, async_err_cb, exc)) + if exc is not None: + async_err_cb(exc) + return - where results are: - [ {props}, {props}, ... ] + self.Updated(uid) + self._optimizer.optimize(uid) + logger.debug("updated %s" % uid) + async_cb() - which is to be read, results is an ordered list of property - dicts, akin to what is returned from get_properties. 'uid' is - included in the properties dict as well and is the unique - identifier used in subsequent calls to refer to that object. + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def update(self, uid, props, file_path, transfer_ownership, + async_cb, async_err_cb): + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + + if os.path.exists(self._file_store.get_file_path(uid)) and \ + (not file_path or os.path.exists(file_path)): + self._optimizer.remove(uid) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._update_completion_cb(async_cb, + async_err_cb, + uid, + *args)) - special keywords in the query that are supported are more - fully documented in the query.py::find method docstring. + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Updated(self, uid): + pass - The 'include_files' keyword will trigger the availability of - user accessible files. Because these are working copies we - don't want to generate them unless needed. In the case the - the full properties set matches doing the single roundtrip - to start an activity makes sense. + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}as', + out_signature='aa{sv}u') + def find(self, query, properties): + t = time.time() - To order results by a given property you can specify: - >>> ds.find(order_by=['author', 'title']) + if not layoutmanager.get_instance().index_updated: + logging.warning('Index updating, returning all entries') - Order by must be a list of property names given in the order - of decreasing precedence. + uids = layoutmanager.get_instance().find_all() + count = len(uids) - """ - # only goes to the primary now. Punting on the merge case - if isinstance(query, dict): - kwargs.update(query) + offset = query.get('offset', 0) + limit = query.get('limit', MAX_QUERY_LIMIT) + uids = uids[offset, offset + limit] else: - if 'query' not in kwargs: - kwargs['query'] = query - - include_files = kwargs.pop('include_files', False) - order_by = kwargs.pop('order_by', []) - - # XXX: this is a workaround, deal properly with n backends - limit = kwargs.pop('limit', DEFAULT_LIMIT) - offset = kwargs.pop('offset', 0) - - # distribute the search to all the mountpoints unless a - # backingstore id set is specified - # backends may be able to return sorted results, if there is - # only a single backend in the query we can use pre-sorted - # results directly - results, count, results_from = self._multiway_search(kwargs, - order_by, - limit, offset) + try: + uids, count = self._index_store.find(query) + except Exception, e: + logging.error('Failed to query index, will rebuild: %r', e) + layoutmanager.get_instance().index_updated = False + self._index_store.close_index() + self._index_store.remove_index() + self._index_store.open_index() + self._rebuild_index() + + entries = [] + for uid in uids: + metadata = self._metadata_store.retrieve(uid, properties) + # Hack because the current journal expects the mountpoint property + # to be present. + metadata['mountpoint'] = '1' + entries.append(metadata) + logger.debug('find(): %r' % (time.time() - t)) + return entries, count - - # ordering is difficult when we are dealing with sets from - # more than one source. The model is this. - # order by the primary (first) sort criteria, then do the rest - # in post processing. This allows use to assemble partially - # database sorted results from many sources and quickly - # combine them. - if results_from > 1: - if order_by: - # resolve key names to columns - if isinstance(order_by, basestring): - order_by = [o.strip() for o in order_by.split(',')] - - if not isinstance(order_by, list): - logger.debug("bad query, order_by should be a list of property names") - order_by = None - - # generate a sort function based on the complete set of - # ordering criteria which includes the primary sort - # criteria as well to keep it stable. - def comparator(a, b): - # we only sort on properties so - for criteria in order_by: - mode = 1 # ascending - if criteria.startswith('-'): - mode = -1 - criteria = criteria[1:] - pa = a.get_property(criteria, None) - pb = b.get_property(criteria, None) - r = cmp(pa, pb) * mode - if r != 0: return r - return 0 - - - r = results.values() - r.sort(comparator) - results = r - - results = results[offset:limit+offset] - else: - results = results.values() - - d = [] - c = 0 - for r in results: - props = r.properties - props['uid'] = r.id - props['mountpoint'] = r.backingstore.id - props['filename'] = '' - d.append(props) - - if properties: - for name in props.keys(): - if name not in properties: - del props[name] - c+= 1 - if limit and c > limit: break - - return (d, count) - - def get(self, uid): - mp = self._resolveMountpoint() - c = None - try: - c = mp.get(uid) - if c: return c - except KeyError: - pass - - if not c: - for mp in self.mountpoints.itervalues(): - try: - c = mp.get(uid) - if c: break - except KeyError: - continue - return c - - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='s', sender_keyword='sender') def get_filename(self, uid, sender=None): - content = self.get(uid) - if content: - # Assign to the backing store the uid of the process that called - # this method. This is needed for copying the file in the right - # place. - backingstore = content.backingstore - backingstore.current_user_id = dbus.Bus().get_unix_user(sender) - try: - # Retrieving the file path for the file will cause the file to be - # copied or linked to a directory accessible by the caller. - file_path = content.filename - except AttributeError: - file_path = '' - finally: - backingstore.current_user_id = None - return file_path - - #@utils.sanitize_dbus + user_id = dbus.Bus().get_unix_user(sender) + return self._file_store.retrieve(uid, user_id) + @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='a{sv}') def get_properties(self, uid): - content = self.get(uid) - props = content.properties - props['mountpoint'] = content.backingstore.id - return props + metadata = self._metadata_store.retrieve(uid) + # Hack because the current journal expects the mountpoint property to be + # present. + metadata['mountpoint'] = '1' + return metadata @dbus.service.method(DS_DBUS_INTERFACE, in_signature='sa{sv}', out_signature='as') def get_uniquevaluesfor(self, propertyname, query=None): - propertyname = str(propertyname) - - if not query: query = {} - mountpoints = query.pop('mountpoints', self.mountpoints) - mountpoints = [self.mountpoints[str(m)] for m in mountpoints] - results = set() - - for mp in mountpoints: - result = mp.get_uniquevaluesfor(propertyname) - results = results.union(result) - return results - - def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None): - logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc)) - if exc is not None: - async_err_cb(exc) - return - - self.Updated(content.id) - logger.debug("updated %s" % content.id) - async_cb() - - #@utils.sanitize_dbus - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}sb', - out_signature='', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def update(self, uid, props, filelike=None, transfer_ownership=False, - async_cb=None, async_err_cb=None): - """Record the current state of the object checked out for a - given uid. If contents have been written to another file for - example. You must create it - """ - content = self.get(uid) - mountpoint = props.pop('mountpoint', None) - content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership, - completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args)) - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Updated(self, uid): pass + if propertyname != 'activity': + raise ValueError('Only ''activity'' is a supported property name') + if query: + raise ValueError('The query parameter is not supported') + if layoutmanager.get_instance().index_updated: + return self._index_store.get_activities() + else: + logging.warning('Index updating, returning an empty list') + return [] - #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='') def delete(self, uid): - content = self.get(uid) - if content: - content.backingstore.delete(uid) + self._optimizer.remove(uid) + + self._index_store.delete(uid) + self._file_store.delete(uid) + self._metadata_store.delete(uid) + + entry_path = layoutmanager.get_instance().get_entry_path(uid) + os.removedirs(entry_path) + self.Deleted(uid) logger.debug("deleted %s" % uid) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Deleted(self, uid): pass + def Deleted(self, uid): + pass def stop(self): """shutdown the service""" + self._index_store.close_index() self.Stopped() - self._connection.get_connection()._unregister_object_path(DS_OBJECT_PATH) - for mp in self.mountpoints.values(): mp.stop() - @dbus.service.signal(DS_DBUS_INTERFACE) - def Stopped(self): pass + def Stopped(self): + pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='', - out_signature='') - def complete_indexing(self): - """Block waiting for all queued indexing operations to - complete. Used mostly in testing""" - for mp in self.mountpoints.itervalues(): - mp.complete_indexing() - + in_signature="sa{sv}", + out_signature='s') + def mount(self, uri, options=None): + return '' + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="", + out_signature="aa{sv}") + def mounts(self): + return [{'id': 1}] + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="s", + out_signature="") + def unmount(self, mountpoint_id): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Mounted(self, descriptior): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Unmounted(self, descriptor): + pass + diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py new file mode 100644 index 0000000..9662885 --- /dev/null +++ b/src/olpc/datastore/filestore.py @@ -0,0 +1,205 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import errno +import logging + +import gobject + +from olpc.datastore import layoutmanager + +class FileStore(object): + """Handle the storage of one file per entry. + """ + # TODO: add protection against store and retrieve operations on entries + # that are being processed async. + + def store(self, uid, file_path, transfer_ownership, completion_cb): + """Store a file for a given entry. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + destination_path = os.path.join(dir_path, 'data') + if file_path: + if not os.path.isfile(file_path): + raise ValueError('No file at %r' % file_path) + if transfer_ownership: + try: + logging.debug('FileStore moving from %r to %r' % \ + (file_path, destination_path)) + os.rename(file_path, destination_path) + completion_cb() + except OSError, e: + if e.errno == errno.EXDEV: + self._async_copy(file_path, destination_path, + completion_cb) + else: + raise + else: + self._async_copy(file_path, destination_path, completion_cb) + elif not file_path and os.path.exists(destination_path): + os.remove(destination_path) + completion_cb() + else: + logging.debug('FileStore: Nothing to do') + completion_cb() + + def _async_copy(self, file_path, destination_path, completion_cb): + """Start copying a file asynchronously. + + """ + logging.debug('FileStore copying from %r to %r' % \ + (file_path, destination_path)) + async_copy = AsyncCopy(file_path, destination_path, completion_cb) + async_copy.start() + + def retrieve(self, uid, user_id): + """Place the file associated to a given entry into a directory where the + user can read it. The caller is reponsible for deleting this file. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + file_path = os.path.join(dir_path, 'data') + if not os.path.exists(file_path): + return '' + + use_instance_dir = os.path.exists('/etc/olpc-security') and \ + os.getuid() != user_id + if use_instance_dir: + if not user_id: + raise ValueError('Couldnt determine the current user uid.') + destination_dir = os.path.join(os.environ['HOME'], 'isolation', '1', + 'uid_to_instance_dir', str(user_id)) + else: + profile = os.environ.get('SUGAR_PROFILE', 'default') + destination_dir = os.path.join(os.path.expanduser('~'), '.sugar', + profile, 'data') + if not os.path.exists(destination_dir): + os.makedirs(destination_dir) + + destination_path = os.path.join(destination_dir, uid) + + # Try to make the original file readable. This can fail if the file is + # in a FAT filesystem. + try: + os.chmod(file_path, 0604) + except OSError, e: + if e.errno != errno.EPERM: + raise + + # Try to hard link from the original file to the targetpath. This can + # fail if the file is in a different filesystem. Do a symlink instead. + try: + os.link(file_path, destination_path) + except OSError, e: + if e.errno == errno.EXDEV: + os.symlink(file_path, destination_path) + else: + raise + + return destination_path + + def get_file_path(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + return os.path.join(dir_path, 'data') + + def delete(self, uid): + """Remove the file associated to a given entry. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + file_path = os.path.join(dir_path, 'data') + if os.path.exists(file_path): + os.remove(file_path) + + def hard_link_entry(self, new_uid, existing_uid): + existing_file = os.path.join( + layoutmanager.get_instance().get_entry_path(existing_uid), + 'data') + new_file = os.path.join( + layoutmanager.get_instance().get_entry_path(new_uid), + 'data') + + logging.debug('removing %r' % new_file) + os.remove(new_file) + + logging.debug('hard linking %r -> %r' % (new_file, existing_file)) + os.link(existing_file, new_file) + +class AsyncCopy(object): + """Copy a file in chunks in the idle loop. + + """ + CHUNK_SIZE = 65536 + + def __init__(self, src, dest, completion): + self.src = src + self.dest = dest + self.completion = completion + self.src_fp = -1 + self.dest_fp = -1 + self.written = 0 + self.size = 0 + + def _cleanup(self): + os.close(self.src_fp) + os.close(self.dest_fp) + + def _copy_block(self, user_data=None): + try: + data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) + count = os.write(self.dest_fp, data) + self.written += len(data) + + # error writing data to file? + if count < len(data): + logging.error('AC: Error writing %s -> %s: wrote less than ' + 'expected' % (self.src, self.dest)) + self._cleanup() + self.completion(RuntimeError( + 'Error writing data to destination file')) + return False + + # FIXME: emit progress here + + # done? + if len(data) < AsyncCopy.CHUNK_SIZE: + self._cleanup() + self.completion(None) + return False + except Exception, err: + logging.error("AC: Error copying %s -> %s: %r" % \ + (self.src, self.dest, err)) + self._cleanup() + self.completion(err) + return False + + return True + + def start(self): + self.src_fp = os.open(self.src, os.O_RDONLY) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, + 0644) + + stat = os.fstat(self.src_fp) + self.size = stat[6] + + gobject.idle_add(self._copy_block) + diff --git a/src/olpc/datastore/indexstore.py b/src/olpc/datastore/indexstore.py new file mode 100644 index 0000000..ec97b4a --- /dev/null +++ b/src/olpc/datastore/indexstore.py @@ -0,0 +1,234 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import time +import os + +import gobject +import xapian +from xapian import WritableDatabase, Document, Enquire, Query, QueryParser + +from olpc.datastore import layoutmanager +from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT + +_VALUE_UID = 0 +_VALUE_TIMESTAMP = 1 +_VALUE_ACTIVITY_ID = 2 +_VALUE_MIME_TYPE = 3 +_VALUE_ACTIVITY = 4 +_VALUE_KEEP = 5 + +_PREFIX_UID = 'Q' +_PREFIX_ACTIVITY = 'A' +_PREFIX_MIME_TYPE = 'M' + +# Force a flush every _n_ changes to the db +_FLUSH_THRESHOLD = 20 + +# Force a flush after _n_ seconds since the last change to the db +_FLUSH_TIMEOUT = 60 + +_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview'] + +_MAX_RESULTS = int(2 ** 31 - 1) + +class IndexStore(object): + """Index metadata and provide rich query facilities on it. + """ + def __init__(self): + self._database = None + self._flush_timeout = None + self._pending_writes = 0 + + def open_index(self): + index_path = layoutmanager.get_instance().get_index_path() + self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN) + + def close_index(self): + self._database.flush() + self._database = None + + def remove_index(self): + index_path = layoutmanager.get_instance().get_index_path() + for f in os.listdir(index_path): + os.remove(os.path.join(index_path, f)) + + def contains(self, uid): + postings = self._database.postlist(_PREFIX_UID + uid) + try: + postlist_item = postings.next() + except StopIteration: + return False + return True + + def store(self, uid, properties): + document = Document() + document.add_term(_PREFIX_UID + uid) + document.add_term(_PREFIX_ACTIVITY + properties['activity']) + document.add_term(_PREFIX_MIME_TYPE + properties['mime_type']) + + document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp'])) + document.add_value(_VALUE_ACTIVITY_ID, properties['activity_id']) + document.add_value(_VALUE_MIME_TYPE, str(properties['mime_type'])) + document.add_value(_VALUE_ACTIVITY, properties['activity']) + + term_generator = xapian.TermGenerator() + + # TODO: we should do stemming, but in which language? + #if language is not None: + # term_generator.set_stemmer(_xapian.Stem(language)) + + # TODO: we should use a stopper + #if stop is not None: + # stopper = _xapian.SimpleStopper() + # for term in stop: + # stopper.add (term) + # term_generator.set_stopper (stopper) + + term_generator.set_document(document) + term_generator.index_text_without_positions( + self._extract_text(properties), 1, '') + + if not self.contains(uid): + self._database.add_document(document) + else: + self._database.replace_document(_PREFIX_UID + uid, document) + self._flush() + + def _extract_text(self, properties): + text = '' + for key, value in properties.items(): + if key not in _PROPERTIES_NOT_TO_INDEX: + if text: + text += ' ' + text += str(value) + return text + + def find(self, query): + enquire = Enquire(self._database) + enquire.set_query(self._parse_query(query)) + + offset = query.get('offset', 0) + limit = query.get('limit', MAX_QUERY_LIMIT) + + # This will assure that the results count is exact. + check_at_least = offset + limit + 1 + + enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + + query_result = enquire.get_mset(offset, limit, check_at_least) + total_count = query_result.get_matches_estimated() + + uids = [] + for hit in query_result: + uids.append(hit.document.get_value(_VALUE_UID)) + + return (uids, total_count) + + def _parse_query(self, query_dict): + logging.debug('_parse_query %r' % query_dict) + queries = [] + + if query_dict.has_key('query'): + query_parser = QueryParser() + query_parser.set_database(self._database) + #query_parser.set_default_op(Query.OP_AND) + + # TODO: we should do stemming, but in which language? + #query_parser.set_stemmer(_xapian.Stem(lang)) + #query_parser.set_stemming_strategy(qp.STEM_SOME) + + query = query_parser.parse_query( + query_dict['query'], + QueryParser.FLAG_PHRASE | + QueryParser.FLAG_BOOLEAN | + QueryParser.FLAG_LOVEHATE | + QueryParser.FLAG_WILDCARD, + '') + + queries.append(query) + + self._replace_mtime(query_dict) + if query_dict.has_key('timestamp'): + start = str(query_dict['timestamp'].pop('start', 0)) + end = str(query_dict['timestamp'].pop('end', _MAX_RESULTS)) + query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end) + queries.append(query) + + if query_dict.has_key('uid'): + queries.append(Query(_PREFIX_UID + query_dict['uid'])) + + if query_dict.has_key('activity'): + queries.append(Query(_PREFIX_ACTIVITY + query_dict['activity'])) + + if query_dict.has_key('mime_type'): + mime_queries = [] + for mime_type in query_dict['mime_type']: + mime_queries.append(Query(_PREFIX_MIME_TYPE + mime_type)) + queries.append(Query(Query.OP_OR, mime_queries)) + + if not queries: + queries.append(Query('')) + + return Query(Query.OP_AND, queries) + + def _replace_mtime(self, query): + # TODO: Just a hack for the current journal that filters by mtime + DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' + if query.has_key('mtime'): + mtime_range = query.pop('mtime') + + start = mtime_range['start'][:-7] + start = time.mktime(time.strptime(start, DATE_FORMAT)) + + end = mtime_range['end'][:-7] + # FIXME: this will give an unexpected result if the journal is in a + # different timezone + end = time.mktime(time.strptime(end, DATE_FORMAT)) + + query['timestamp'] = {'start': int(start), 'end': int(end)} + + def delete(self, uid): + self._database.delete_document(_PREFIX_UID + uid) + + def get_activities(self): + activities = [] + for term in self._database.allterms(_PREFIX_ACTIVITY): + activities.append(term.term[len(_PREFIX_ACTIVITY):]) + return activities + + def _flush_timeout_cb(self): + self._flush(True) + return False + + def _flush(self, force=False): + """Called after any database mutation""" + logging.debug('IndexStore.flush: %r %r' % (force, self._pending_writes)) + + if self._flush_timeout is not None: + gobject.source_remove(self._flush_timeout) + self._flush_timeout = None + + self._pending_writes += 1 + if force or self._pending_writes > _FLUSH_THRESHOLD: + self._database.flush() + self._pending_writes = 0 + else: + self._flush_timeout = gobject.timeout_add(_FLUSH_TIMEOUT * 1000, + self._flush_timeout_cb) + diff --git a/src/olpc/datastore/layoutmanager.py b/src/olpc/datastore/layoutmanager.py new file mode 100644 index 0000000..2b3e521 --- /dev/null +++ b/src/olpc/datastore/layoutmanager.py @@ -0,0 +1,101 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os + +MAX_QUERY_LIMIT = 40960 + +class LayoutManager(object): + """Provide the logic about how entries are stored inside the datastore directory + """ + def __init__(self): + profile = os.environ.get('SUGAR_PROFILE', 'default') + base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) + + self._root_path = os.path.join(base_dir, 'datastore') + + if not os.path.exists(self._root_path): + os.makedirs(self._root_path) + self.set_version(1) + + self._create_if_needed(self.get_checksums_dir()) + self._create_if_needed(self.get_queue_path()) + + index_updated_path = os.path.join(self._root_path, 'index_updated') + self._index_updated = os.path.exists(index_updated_path) + + def _create_if_needed(self, path): + if not os.path.exists(path): + os.makedirs(path) + + def get_version(self): + version_path = os.path.join(self._root_path, 'version') + version = 0 + if os.path.exists(version_path): + version = int(open(version_path, 'r').read()) + return version + + def set_version(self, version): + version_path = os.path.join(self._root_path, 'version') + open(version_path, 'w').write(str(version)) + + def get_entry_path(self, uid): + # os.path.join() is just too slow + return '%s/%s/%s' % (self._root_path, uid[:2], uid) + + def get_root_path(self): + return self._root_path + + def get_index_path(self): + return os.path.join(self._root_path, 'index') + + def get_checksums_dir(self): + return os.path.join(self._root_path, 'checksums') + + def get_queue_path(self): + return os.path.join(self.get_checksums_dir(), 'queue') + + def _is_index_updated(self): + return self._index_updated + + def _set_index_updated(self, index_updated): + if index_updated != self._index_updated: + self._index_updated = index_updated + + index_updated_path = os.path.join(self._root_path, 'index_updated') + if os.path.exists(index_updated_path): + os.remove(index_updated_path) + else: + open(index_updated_path, 'w').close() + + index_updated = property(_is_index_updated, _set_index_updated) + + def find_all(self): + uids = [] + for f in os.listdir(self._root_path): + if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: + for g in os.listdir(os.path.join(self._root_path, f)): + if len(g) == 36: + uids.append(g) + return uids + +_instance = None +def get_instance(): + global _instance + if _instance is None: + _instance = LayoutManager() + return _instance + diff --git a/src/olpc/datastore/metadatareader.c b/src/olpc/datastore/metadatareader.c new file mode 100644 index 0000000..ce6d38e --- /dev/null +++ b/src/olpc/datastore/metadatareader.c @@ -0,0 +1,199 @@ +#include "Python.h" + +#include <dirent.h> + +// TODO: put it in a place where python can use it when writing metadata +#define MAX_PROPERTY_LENGTH 500 * 1024 + +static PyObject *byte_array_type = NULL; + +static PyObject * +metadatareader_retrieve(PyObject *unused, PyObject *args) +{ + PyObject *dict = NULL; + PyObject *properties = NULL; + const char *dir_path = NULL; + char *metadata_path = NULL; + DIR *dir_stream = NULL; + struct dirent *dir_entry = NULL; + char *file_path = NULL; + FILE *file = NULL; + char *value_buf = NULL; + + if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties)) + return NULL; + + // Build path to the metadata directory + int metadata_path_size = strlen(dir_path) + 10; + metadata_path = PyMem_Malloc(metadata_path_size); + if (metadata_path == NULL) { + PyErr_NoMemory(); + goto cleanup; + } + snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata"); + + dir_stream = opendir (metadata_path); + if (dir_stream == NULL) { + char buf[256]; + snprintf(buf, sizeof(buf), "Couldn't open metadata directory %s", + metadata_path); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + dict = PyDict_New(); + + dir_entry = readdir(dir_stream); + while (dir_entry != NULL) { + long file_size; + int file_path_size; + PyObject *value = NULL; + + // Skip . and .. + if (dir_entry->d_name[0] == '.' && + (strlen(dir_entry->d_name) == 1 || + (dir_entry->d_name[1] == '.' && + strlen(dir_entry->d_name) == 2))) + goto next_property; + + // Check if the property is in the properties list + if ((properties != Py_None) && (PyList_Size(properties) > 0)) { + int found = 0; + int i; + for (i = 0; i < PyList_Size(properties); i++) { + PyObject *property = PyList_GetItem(properties, i); + if (!strcmp (dir_entry->d_name, PyString_AsString (property))) { + found = 1; + } + } + if (!found) { + goto next_property; + } + } + + // Build path of the property file + file_path_size = strlen(metadata_path) + 1 + strlen(dir_entry->d_name) + + 1; + file_path = PyMem_Malloc(file_path_size); + if (file_path == NULL) { + PyErr_NoMemory(); + goto cleanup; + } + snprintf (file_path, file_path_size, "%s/%s", metadata_path, + dir_entry->d_name); + + file = fopen(file_path, "r"); + if (file == NULL) { + char buf[256]; + snprintf(buf, sizeof(buf), "Cannot open property file %s: %s", + file_path, strerror(errno)); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + // Get file size + fseek (file, 0, SEEK_END); + file_size = ftell (file); + rewind (file); + + if (file_size == 0) { + // Empty property + value = PyString_FromString(""); + if (value == NULL) { + PyErr_SetString(PyExc_ValueError, + "Failed to convert value to python string"); + goto cleanup; + } + } else { + if (file_size > MAX_PROPERTY_LENGTH) { + PyErr_SetString(PyExc_ValueError, "Property file too big"); + goto cleanup; + } + + // Read the whole file + value_buf = PyMem_Malloc(file_size); + if (value_buf == NULL) { + PyErr_NoMemory(); + goto cleanup; + } + long read_size = fread(value_buf, 1, file_size, file); + if (read_size < file_size) { + char buf[256]; + snprintf(buf, sizeof(buf), + "Error while reading property file %s", file_path); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + // Convert value to dbus.ByteArray + PyObject *args = Py_BuildValue("(s#)", value_buf, file_size); + value = PyObject_CallObject(byte_array_type, args); + if (value == NULL) { + PyErr_SetString(PyExc_ValueError, + "Failed to convert value to dbus.ByteArray"); + goto cleanup; + } + } + + // Add property to the metadata dict + if (PyDict_SetItemString(dict, dir_entry->d_name, value) == -1) { + PyErr_SetString(PyExc_ValueError, + "Failed to add property to dictionary"); + goto cleanup; + } + + next_property: + if (file_path) { + PyMem_Free(file_path); + file_path = NULL; + } + if (file) { + fclose(file); + file = NULL; + } + if (value_buf) { + PyMem_Free(value_buf); + value_buf = NULL; + } + + dir_entry = readdir(dir_stream); + } + + closedir(dir_stream); + + return dict; + +cleanup: + if (file_path) { + PyMem_Free(file_path); + } + if (value_buf) { + PyMem_Free(value_buf); + } + if (dict) { + Py_DECREF(dict); + } + if (file) { + fclose(file); + } + if (dir_stream) { + closedir(dir_stream); + } + return NULL; +} + +static PyMethodDef metadatareader_functions[] = { + {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")}, + {NULL, NULL, 0, NULL} +}; + +PyMODINIT_FUNC +initmetadatareader(void) +{ + PyObject* mod; + mod = Py_InitModule("metadatareader", metadatareader_functions); + + PyObject *dbus_module = PyImport_ImportModule("dbus"); + byte_array_type = PyObject_GetAttrString(dbus_module, "ByteArray"); +} + diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py new file mode 100644 index 0000000..8d1e377 --- /dev/null +++ b/src/olpc/datastore/metadatastore.py @@ -0,0 +1,50 @@ +import os + +from olpc.datastore import layoutmanager +from olpc.datastore import metadatareader + +MAX_SIZE = 256 + +class MetadataStore(object): + def store(self, uid, metadata): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + metadata_path = os.path.join(dir_path, 'metadata') + if not os.path.exists(metadata_path): + os.makedirs(metadata_path) + else: + for key in os.listdir(metadata_path): + os.remove(os.path.join(metadata_path, key)) + + metadata['uid'] = uid + for key, value in metadata.items(): + open(os.path.join(metadata_path, key), 'w').write(str(value)) + + def retrieve(self, uid, properties=None): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + return metadatareader.retrieve(dir_path, properties) + + def delete(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + for key in os.listdir(metadata_path): + os.remove(os.path.join(metadata_path, key)) + os.rmdir(metadata_path) + + def get_property(self, uid, key): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + if os.path.exists(property_path): + return open(property_path, 'r').read() + else: + return None + + def set_property(self, uid, key, value): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + open(property_path, 'w').write(value) + diff --git a/src/olpc/datastore/migration.py b/src/olpc/datastore/migration.py new file mode 100644 index 0000000..7b47d6f --- /dev/null +++ b/src/olpc/datastore/migration.py @@ -0,0 +1,81 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +"""Transform one DataStore directory in a newer format. +""" + +import os +import logging +import shutil + +import cjson + +from olpc.datastore import layoutmanager + +def migrate_from_0(): + logging.info('Migrating datastore from version 0 to version 1') + root_path = layoutmanager.get_instance().get_root_path() + old_root_path = os.path.join(root_path, 'store') + for f in os.listdir(old_root_path): + uid, ext = os.path.splitext(f) + if ext != '.metadata': + continue + + logging.debug('Migrating entry %r' % uid) + try: + _migrate_metadata(root_path, old_root_path, uid) + _migrate_file(root_path, old_root_path, uid) + _migrate_preview(root_path, old_root_path, uid) + except Exception: + #logging.warning('Failed to migrate entry %r:%s\n' %(uid, + # ''.join(traceback.format_exception(*sys.exc_info())))) + # + # In production, we may choose to ignore errors when failing to + # migrate some entries. But for now, raise them. + raise + + # Just be paranoid, it's cheap. + if old_root_path.endswith('datastore/store'): + shutil.rmtree(old_root_path) + + logging.info('Migration finished') + +def _migrate_metadata(root_path, old_root_path, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + os.makedirs(metadata_path) + + old_metadata_path = os.path.join(old_root_path, uid + '.metadata') + metadata = cjson.decode(open(old_metadata_path, 'r').read()) + for key, value in metadata.items(): + f = open(os.path.join(metadata_path, key), 'w') + try: + f.write(str(value)) + finally: + f.close() + +def _migrate_file(root_path, old_root_path, uid): + if os.path.exists(os.path.join(old_root_path, uid)): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + os.rename(os.path.join(old_root_path, uid), + os.path.join(dir_path, 'data')) + +def _migrate_preview(root_path, old_root_path, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + os.rename(os.path.join(old_root_path, 'preview', uid), + os.path.join(metadata_path, 'preview')) + diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py deleted file mode 100644 index e4a3e3b..0000000 --- a/src/olpc/datastore/model.py +++ /dev/null @@ -1,412 +0,0 @@ -""" -olpc.datastore.model -~~~~~~~~~~~~~~~~~~~~ -The datamodel for the metadata - -""" - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - -import datetime -import os -import time -import warnings -import logging - -from sugar import mime - -from olpc.datastore.utils import timeparse - - -# XXX: Open issues -# list properties - Contributors (a, b, c) -# difficult to index now -# content state - searches don't include content deletion flag -# - not recording if content is on other storage yet - -propertyTypes = {} -_marker = object() - -def registerPropertyType(kind, get, set, xapian_sort_type=None, - defaults=None, for_xapian=None, from_xapain=None): - propertyTypes[kind] = PropertyImpl(get, set, xapian_sort_type, - defaults, for_xapian=for_xapian, from_xapain=from_xapain) - -def propertyByKind(kind): return propertyTypes[kind] - -class PropertyImpl(object): - __slots__ = ('_get', '_set', 'xapian_sort_type', 'defaults', '_for_xapian', '_from_xapian') - - def __init__(self, get, set, xapian_sort_type=None, defaults=None, - for_xapian=None, from_xapain=None): - self._get, self._set = get, set - self.xapian_sort_type = xapian_sort_type - self.defaults = defaults - if not for_xapian: for_xapian = self._get - self._for_xapian = for_xapian - if not from_xapain: from_xapain = self._set - self._from_xapian = from_xapain - - def get(self, value): return self._get(value) - def set(self, value): return self._set(value) - def for_xapian(self, value): return self._for_xapian(value) - def from_xapian(self, value): return self._from_xapian(value) - -class Property(object): - """Light-weight property implementation. - Handles typed properties via a global registry of type->callbacks - - >>> p = Property(key, value, 'string') - >>> b = Property(key, value, 'binary') - """ - def __init__(self, key, value, kind=None): - - self.kind = kind - if kind not in propertyTypes: - warnings.warn("Unknown property type: %s on key %s" % \ - (kind, key), RuntimeWarning) - else: self._impl = propertyTypes[kind] - - self.key = key - self.value = value - - @classmethod - def fromstring(cls, key, value=''): - kind = 'string' - if ':' in key: - key, kind = key.split(':', 1) - # now resolve the kind to a property class - return cls(key, value, kind) - - - def __repr__(self): - return "<%s(%s) %s:%r>" % (self.__class__.__name__, - self.kind, - self.key, self.value) - - def get_value(self): return self._impl.get(self._value) - def set_value(self, value): self._value = self._impl.set(value) - value = property(get_value, set_value) - - @property - def for_xapian(self): return self._impl.for_xapian(self._value) - - - def __str__(self): return str(self.value) - -class Model(object): - """Object containing the field/property model used by the - system""" - - def __init__(self): - self.fields = {} - self.fieldnames = [] - - def copy(self): - m = Model() - m.fields = self.fields.copy() - m.fieldnames = self.fieldnames[:] - return m - - def fromstring(self, key, value, allowAddition=False): - """create a property from the key name by looking it up in the - model.""" - kind = None - if ':' in key: key, kind = key.split(':', 1) - added = False - field = self.fields.get(key) - if field: mkind = field[1] - elif allowAddition: - # create a new field, this will force a change in the - # model - # and in turn should add a new field action - if not kind: kind = "string" - self.addField(key,kind) - mkind = kind - added = True - else: - raise KeyError("no field specification for %s" % key) - - if kind and mkind: - if kind != mkind: raise ValueError("""Specified wire - encoding for property %s was %s, expected %s""" %(key, kind, mkind)) - kind = mkind - - return Property(key, value, kind), added - - - def addField(self, key, kind, overrides=None): - """ Add a field to the model. - key -- field name - kind -- type by name (registered with registerPropertyType) - kwargs -- overrides and additional values to the default - arguments supplied by kind - """ - if key in self.fields: - raise KeyError("""Another source tried to add %s field to the model""" % key) - - impl = propertyByKind(kind) - options = impl.defaults.copy() - if overrides: options.update(overrides) - if impl.xapian_sort_type: - if 'type' not in options: - options['type'] = impl.xapian_sort_type - - self.fields[key] = (key, kind, options) - self.fieldnames.append(key) - return self - - def addFields(self, *args): - """ List of arguments to addField """ - for arg in args: self.addField(*arg) - return self - - def apply(self, indexmanager): - addField = indexmanager.addField - for fn in self.fieldnames: - args = self.fields[fn] - addField(args[0], **args[2]) - - def get_external_properties(self): - external_properties = [] - for field_name in self.fields: - field = self.fields.get(field_name) - if field[1] == "external": - external_properties.append(field[0]) - return external_properties - -# Properties we don't automatically include in properties dict -EXCLUDED_PROPERTIES = ['fulltext', ] - -class Content(object): - """A light weight proxy around Xapian Documents from secore. - This provides additional methods which are used in the - backingstore to assist in storage - """ - __slots__ = ('_doc', '_backingstore', '_file', '_model') - - def __init__(self, xapdoc, backingstore=None, model=None): - self._doc = xapdoc - self._backingstore = backingstore - self._file = None - self._model = model - - def __repr__(self): - return "<%s %s>" %(self.__class__.__name__, - self.properties) - - def get_property(self, key, default=_marker): - result = self._doc.data.get(key, default) - if result is _marker: raise KeyError(key) - if isinstance(result, list) and len(result) == 1: - result = result[0] - field = self._model.fields.get(key) - if field[1] == "external": - return self.get_external_property(key) - else: - kind = propertyByKind(field[1]) - # Errors here usually property request for a missing field - return kind.from_xapian(result) - - def get_external_property(self, key): - return self._backingstore.get_external_property(self.id, key) - - @property - def properties(self): - d = {} - for k in self._model.fields: - if k in EXCLUDED_PROPERTIES: continue - - field = self._model.fields.get(k) - if field: - if field[1] == "external": - v = self.get_external_property(k) - else: - v = self.data.get(k, _marker) - if v is _marker: continue - if isinstance(v, list) and len(v) == 1: - v = v[0] - kind = propertyByKind(field[1]) - v = kind.from_xapian(v) - else: - # do some generic property handling - if v: v = str(v) - else: v = '' - d[k] = v - return d - - def _get_extension_from_mimetype(self): - # try to get an extension from the mimetype if available - mt = self.get_property('mime_type', None) - if mt is not None: - ext = mime.get_primary_extension(mt) - # .ksh is a strange ext for plain text - if ext and ext == '.ksh': ext = '.txt' - if ext and ext == '.jpe': ext = '.jpg' # fixes #3163 - return ext - return None - - def suggestName(self): - # we look for certain known property names - # - filename - # - ext - # and create a base file name that will be used for the - # checkout name - filename = self.get_property('filename', None) - ext = self.get_property('ext', '') - if not ext: - ext = self._get_extension_from_mimetype() - - logging.debug('Content.suggestName: %r %r' % (filename, ext)) - - if filename: - # some backingstores keep the full relative path - filename = os.path.split(filename)[1] - f, e = os.path.splitext(filename) - if e: return filename, None - if ext: return "%s.%s" % (filename, ext), None - elif ext: - return None, ext - - return None, None - - def get_file(self): - if not hasattr(self, "_file") or not self._file or \ - self._file.closed is True: - target, ext = self.suggestName() - targetfile = self.backingstore._targetFile(self.id, target, ext) - self._file = targetfile - return self._file - - def set_file(self, fileobj): - self._file = fileobj - file = property(get_file, set_file) - - @property - def filename(self): return os.path.abspath(self.file.name) - - @property - def contents(self): return self.file.read() - - @property - def backingstore(self): return self._backingstore - - @property - def id(self): return self._doc.id - - @property - def data(self): return self._doc.data - - -def noop(value): return value - -import re -base64hack = re.compile("(\S{212})") -def base64enc(value): return ' '.join(base64hack.split(value.encode('base64'))) -def base64dec(value): return value.replace(' ', '').decode('base64') - -DATEFORMAT = "%Y-%m-%dT%H:%M:%S" -def date2string(value): return value.replace(microsecond=0).isoformat() -def string2date(value): return timeparse(value, DATEFORMAT) - -def encode_datetime(value): - # encode datetime to timestamp (float) - # parse the typelib form to a datetime first - if isinstance(value, basestring): value = string2date(value) - return str(time.mktime(value.timetuple())) - -def decode_datetime(value): - # convert a float to a local datetime - return datetime.datetime.fromtimestamp(float(value)).isoformat() - -def datedec(value, dateformat=DATEFORMAT): - return timeparse(value, DATEFORMAT) - -def dateenc(value, dateformat=DATEFORMAT): - if isinstance(value, basestring): - # XXX: there is an issue with microseconds not getting parsed - value = timeparse(value, DATEFORMAT) - value = value.replace(microsecond=0) - return value.isoformat() - - - -# type, get, set, xapian sort type [string|float|date], defaults -# defaults are the default options to addField in IndexManager -# these can be overridden on model assignment -registerPropertyType('string', noop, noop, 'string', {'store' : True, - 'exact' : True, - 'sortable' : True}) - -registerPropertyType('text', noop, noop, 'string', {'store' : True, - 'exact' : False, - 'sortable' : False, - 'collapse' : True, - }) - -registerPropertyType('binary', noop, noop, None, {'store' : True, - 'exact' : False, - 'fulltext': False, - 'sortable' : False}) - -registerPropertyType('int', str, int, 'float', {'store' : True, - 'exact' : True, - 'sortable' : True}, - for_xapian=str) - -registerPropertyType('number', str, float, 'float', {'store' : True, - 'exact' : True, - 'sortable' : True}) - -registerPropertyType('date', dateenc, datedec, 'float', {'store' : True, - 'exact' : True, - 'sortable' : True - }, - for_xapian=encode_datetime, - from_xapain=decode_datetime) - - -registerPropertyType('external', noop, noop, 'string', {'external' : True, - 'store' : False, - 'exact' : False, - 'fulltext' : False, - }) - -defaultModel = Model().addFields( - ('fulltext', 'text'), - # vid is version id - ('vid', 'number'), - ('checksum', 'string'), - ('filename', 'string'), - ('ext', 'string'), # its possible we don't store a filename, but - # only an extension we are interested in - # Title has additional weight - ('title', 'text', {'weight' : 2 }), - ('url', 'string'), - ('mime_type', 'string'), - ('author', 'string'), - ('language', 'string'), - ('ctime', 'date'), - ('mtime', 'date'), - # Better store the timestamp instead of date strings - ('timestamp', 'int'), - # this will just be a space delimited list of tags - # indexed with the content - # I give them high weight as they have user given semantic value. - ('tags', 'text', {'weight' :3 } ), - - # olpc specific - ('activity', 'string'), - ('activity_id', 'string'), - ('title_set_by_user', 'text'), - ('keep', 'int'), - ('icon-color', 'string'), - ('preview', 'external'), - ('buddies', 'text'), - ('source', 'text'), - ('description', 'text'), - ) - diff --git a/src/olpc/datastore/optimizer.py b/src/olpc/datastore/optimizer.py new file mode 100644 index 0000000..ed62e55 --- /dev/null +++ b/src/olpc/datastore/optimizer.py @@ -0,0 +1,153 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import errno +import logging + +import gobject + +from olpc.datastore import layoutmanager + +class Optimizer(object): + """Optimizes disk space usage by detecting duplicates and sharing storage. + """ + def __init__(self, file_store, metadata_store): + self._file_store = file_store + self._metadata_store = metadata_store + self._enqueue_checksum_id = None + + def optimize(self, uid): + """Add an entry to a queue of entries to be checked for duplicates. + + """ + if not os.path.exists(self._file_store.get_file_path(uid)): + return + + queue_path = layoutmanager.get_instance().get_queue_path() + open(os.path.join(queue_path, uid), 'w').close() + logging.debug('optimize %r' % os.path.join(queue_path, uid)) + + if self._enqueue_checksum_id is None: + self._enqueue_checksum_id = \ + gobject.idle_add(self._process_entry_cb, + priority=gobject.PRIORITY_LOW) + + def remove(self, uid): + """Remove any structures left from space optimization + + """ + checksum = self._metadata_store.get_property(uid, 'checksum') + if checksum is None: + return + + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('remove %r' % os.path.join(checksum_path, uid)) + os.remove(os.path.join(checksum_path, uid)) + try: + os.rmdir(checksum_path) + logging.debug('removed %r' % checksum_path) + except OSError, e: + if e.errno != errno.ENOTEMPTY: + raise + + def _identical_file_already_exists(self, checksum): + """Check if we already have files with this checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(checksum_path) + + def _get_uid_from_checksum(self, checksum): + """Get an existing entry which file matches checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + first_uid = os.listdir(checksum_path)[0] + return first_uid + + def _create_checksum_dir(self, checksum): + """Create directory that tracks files with this same checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + logging.debug('create dir %r' % checksum_path) + os.mkdir(checksum_path) + + def _add_checksum_entry(self, uid, checksum): + """Create a file in the checksum dir with the uid of the entry + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('touch %r' % os.path.join(checksum_path, uid)) + open(os.path.join(checksum_path, uid), 'w').close() + + def _already_linked(self, uid, checksum): + """Check if this entry's file is already a hard link to the checksums + dir. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(os.path.join(checksum_path, uid)) + + def _process_entry_cb(self): + """Process one item in the checksums queue by calculating its checksum, + checking if there exist already an identical file, and in that case + substituting its file with a hard link to that pre-existing file. + + """ + queue_path = layoutmanager.get_instance().get_queue_path() + queue = os.listdir(queue_path) + if queue: + uid = queue[0] + logging.debug('_process_entry_cb processing %r' % uid) + file_in_entry_path = self._file_store.get_file_path(uid) + checksum = self._calculate_md5sum(file_in_entry_path) + self._metadata_store.set_property(uid, 'checksum', checksum) + + if self._identical_file_already_exists(checksum): + if not self._already_linked(uid, checksum): + existing_entry_uid = self._get_uid_from_checksum(checksum) + self._file_store.hard_link_entry(uid, existing_entry_uid) + + self._add_checksum_entry(uid, checksum) + else: + self._create_checksum_dir(checksum) + self._add_checksum_entry(uid, checksum) + + os.remove(os.path.join(queue_path, uid)) + + if len(queue) <= 1: + self._enqueue_checksum_id = None + return False + else: + return True + + def _calculate_md5sum(self, path): + """Calculate the md5 checksum of a given file. + + """ + in_, out = os.popen2(['md5sum', path]) + return out.read().split(' ', 1)[0] + diff --git a/src/olpc/datastore/utils.py b/src/olpc/datastore/utils.py deleted file mode 100644 index 0505463..0000000 --- a/src/olpc/datastore/utils.py +++ /dev/null @@ -1,162 +0,0 @@ -import datetime -import dbus -import re -import time - - -class Singleton(type): - """A singleton metaclass - - >>> class MyManager(object): - ... __metaclass__ = Singleton - >>> a = MyManager() - >>> b = MyManager() - >>> assert a is b - - """ - def __init__(cls,name,bases,dic): - super(Singleton,cls).__init__(name,bases,dic) - cls._instance=None - def __call__(cls,*args,**kw): - if cls._instance is None: - cls._instance=super(Singleton,cls).__call__(*args,**kw) - return cls._instance - -class partial: - def __init__(self, fun, *args, **kwargs): - self.fun = fun - self.pending = args - self.kwargs = kwargs - - def __call__(self, *args, **kwargs): - if kwargs and self.kwargs: - kw = self.kwargs.copy() - kw.update(kwargs) - else: - kw = kwargs or self.kwargs - - return self.fun(*(self.pending + args), **kw) - -def once(method): - "A decorator that runs a method only once." - attrname = "_called" - def decorated(self, *args, **kwargs): - try: - return getattr(method, attrname) - except AttributeError: - r = method(self, *args, **kwargs) - setattr(method, attrname, r) - return r - return decorated - - - -def create_uid(): - # this is linux specific but easily changed - # Python 2.5 has universal support for this built in - return open('/proc/sys/kernel/random/uuid', 'r').read()[:-1] - - -def options_for(dict, prefix, invert=False): - """return a dict of the filtered properties for keys with prefix. - prefix will be removed - - If invert is True then only those keys not matching prefix are returned. - - >>> assert options_for({'app.a.option' : 1, 'app.b.option' : 2}, 'app.b.')['option'] == 2 - """ - d = {} - l = len(prefix) - for k, v in dict.iteritems(): - if k.startswith(prefix): - if invert is False:d[k[l:]] = v - elif invert is True: - d[k] = v - - return d - - - -def _convert(arg): - # this recursively processes arguments sent over dbus and yields - # normalized versions - if isinstance(arg, (dbus.String, dbus.UTF8String)): - try: return arg.encode('utf-8') - except: return str(arg) - - if isinstance(arg, (dbus.Dictionary, dict)): - d = {} - for k, v in arg.iteritems(): - # here we call str on the lhs making it suitable for - # passing as keywords args - d[str(_convert(k))] = _convert(v) - return d - - if isinstance(arg, dbus.Array): - a = [] - for item in arg: - a.append(_convert(item)) - return a - return arg - - -def sanitize_dbus(method): - # decorator to produce an alternative version of arguments based on pure Python - # types. - def decorator(self, *args, **kwargs): - n = [] - for arg in args: n.append(_convert(arg)) - kw = _convert(kwargs) - return method(self, *n, **kw) - return decorator - -DATEFORMAT = "%Y-%m-%dT%H:%M:%S" -def timeparse(t, format=DATEFORMAT): - """Parse a time string that might contain fractions of a second. - - Fractional seconds are supported using a fragile, miserable hack. - Given a time string like '02:03:04.234234' and a format string of - '%H:%M:%S', time.strptime() will raise a ValueError with this - message: 'unconverted data remains: .234234'. If %S is in the - format string and the ValueError matches as above, a datetime - object will be created from the part that matches and the - microseconds in the time string. - """ - try: - return datetime.datetime(*time.strptime(t, format)[0:6]) - except ValueError, msg: - if "%S" in format: - msg = str(msg) - mat = re.match(r"unconverted data remains:" - " \.([0-9]{1,6})$", msg) - if mat is not None: - # fractional seconds are present - this is the style - # used by datetime's isoformat() method - frac = "." + mat.group(1) - t = t[:-len(frac)] - t = datetime.datetime(*time.strptime(t, format)[0:6]) - microsecond = int(float(frac)*1e6) - return t.replace(microsecond=microsecond) - else: - mat = re.match(r"unconverted data remains:" - " \,([0-9]{3,3})$", msg) - if mat is not None: - # fractional seconds are present - this is the style - # used by the logging module - frac = "." + mat.group(1) - t = t[:-len(frac)] - t = datetime.datetime(*time.strptime(t, format)[0:6]) - microsecond = int(float(frac)*1e6) - return t.replace(microsecond=microsecond) - - raise - - -def parse_timestamp_or_float(value): - result = None - try: - result = timeparse(value) - result = str(time.mktime(result.timetuple())) - except: - result = str(float(value)) - return result diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py deleted file mode 100644 index e8a5ee5..0000000 --- a/src/olpc/datastore/xapianindex.py +++ /dev/null @@ -1,575 +0,0 @@ -""" -xapianindex -~~~~~~~~~~~~~~~~~~~~ -maintain indexes on content - -""" -from __future__ import with_statement - -__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>' -__docformat__ = 'restructuredtext' -__copyright__ = 'Copyright ObjectRealms, LLC, 2007' -__license__ = 'The GNU Public License V2+' - - - -from Queue import Queue, Empty -import gc -import logging -import re -import sys -import time -import thread -import threading -import warnings -import traceback - -import secore -import xapian as _xapian # we need to modify the QueryParser -import gobject - -from olpc.datastore import model -from olpc.datastore.converter import converter -from olpc.datastore.utils import create_uid, parse_timestamp_or_float - - -# Setup Logger -logger = logging.getLogger('org.sugar.datastore.xapianindex') - -# Indexer Operations -CREATE = 1 -UPDATE = 2 -DELETE = 3 - -# Force a flush every _n_ changes to the db -FLUSH_THRESHOLD = 20 - -# Force a flush after _n_ seconds since the last change to the db -FLUSH_TIMEOUT = 60 - -class ReadWriteConnection(secore.indexerconnection.IndexerConnection, - secore.searchconnection.SearchConnection): - # has search methods on a write connection - pass - -class ContentMappingIter(object): - """An iterator over a set of results from a search. - - """ - def __init__(self, results, backingstore, model): - self._results = results - self._backingstore = backingstore - self._iter = iter(results) - self._model = model - - def __iter__(self): return self - - def next(self): - searchresult = self._iter.next() - return model.Content(searchresult, self._backingstore, self._model) - - -class IndexManager(object): - DEFAULT_DATABASE_NAME = 'index' - - def __init__(self, default_language='en'): - # We will maintain two connections to the database - # we trigger automatic flushes to the read_index - # after any write operation - self.write_index = None - self.read_index = None - self.queue = Queue(0) - self.indexer_running = False - self.language = default_language - - self.backingstore = None - - self.fields = set() - self._write_lock = threading.Lock() - - self.deltact = 0 # delta count - self._flush_timeout = None - - # - # Initialization - def connect(self, repo, **kwargs): - logging.debug('IndexManager.connect()') - if self.write_index is not None: - warnings.warn('''Requested redundant connect to index''', - RuntimeWarning) - - self.repo = repo - self.write_index = ReadWriteConnection(repo) - - # configure the database according to the model - datamodel = kwargs.get('model', model.defaultModel) - datamodel.apply(self) - - # store a reference - self.datamodel = datamodel - - self.read_index = self.write_index - - self.flush(force=True) - # by default we start the indexer now - self.startIndexer() - assert self.indexer.isAlive() - - - def bind_to(self, backingstore): - # signal from backingstore that its our parent - self.backingstore = backingstore - - - def stop(self, force=False): - logging.debug('IndexManager.stop()') - self.flush(force=True) - self.stopIndexer(force) - - # TODO: Would be better to check if the device is present and - # don't try to close if it's not. - try: - self.write_index.close() - except _xapian.DatabaseError, e: - logging.debug('Index close failed:\n' + \ - ''.join(traceback.format_exception(*sys.exc_info()))) - - #self.read_index.close() - # XXX: work around for xapian not having close() this will - # change in the future in the meantime we delete the - # references to the indexers and then force the gc() to run - # which should inturn trigger the C++ destructor which forces - # the database shut. - self.write_index = None - self.read_index = None - gc.collect() - - # Index thread management - def startIndexer(self): - self.indexer_running = True - self.indexer = threading.Thread(target=self.indexThread) - self.indexer.setDaemon(True) - self.indexer.start() - - def stopIndexer(self, force=False): - if not self.indexer_running: return - if not force: self.queue.join() - self.indexer_running = False - # should terminate after the current task - self.indexer.join() - - def _flush_timeout_cb(self): - self.flush(True) - return False - - # flow control - def flush(self, force=False): - logging.debug('IndexManager.flush: %r %r' % (force, self.deltact)) - """Called after any database mutation""" - - if self._flush_timeout is not None: - gobject.source_remove(self._flush_timeout) - self._flush_timeout = None - - self.deltact += 1 - if force or self.deltact > FLUSH_THRESHOLD: - with self._write_lock: - - # TODO: Would be better to check if the device is present and - # don't try to flush if it's not. - try: - self.write_index.flush() - except _xapian.DatabaseError, e: - logging.debug('Index flush failed:\n' + \ - ''.join(traceback.format_exception(*sys.exc_info()))) - - #self.read_index.reopen() - self.deltact = 0 - else: - self._flush_timeout = gobject.timeout_add(FLUSH_TIMEOUT * 1000, - self._flush_timeout_cb) - - def enque(self, uid, vid, doc, operation, filestuff=None): - # here we implement the sync/async policy - # we want to take create/update operations and - # set theproperties right away, the - # conversion/fulltext indexing can - # happen in the thread - if operation in (CREATE, UPDATE): - with self._write_lock: - if operation is CREATE: - self.write_index.add(doc) - logger.info("created %s:%s" % (uid, vid)) - elif operation is UPDATE: - self.write_index.replace(doc) - logger.info("updated %s:%s" % (uid, vid)) - self.flush() - - # Disable content indexing for Trial-3. - # https://dev.laptop.org/ticket/3058 - return - - # now change CREATE to UPDATE as we set the - # properties already - operation = UPDATE - if not filestuff: - # In this case we are done - return - elif operation is DELETE: - # sync deletes - with self._write_lock: - self.write_index.delete(uid) - logger.info("deleted content %s:%s" % (uid,vid)) - self.flush() - return - - self.queue.put((uid, vid, doc, operation, filestuff)) - - def indexThread(self): - # process the queue - # XXX: there is currently no way to remove items from the queue - # for example if a USB stick is added and quickly removed - # the mount should however get a stop() call which would - # request that the indexing finish - # XXX: we can in many cases index, not from the tempfile but - # from the item in the repo as that will become our immutable - # copy. Detect those cases and use the internal filename - # property or backingstore._translatePath to get at it - while self.indexer_running: - # include timeout here to ease shutdown of the thread - # if this is a non-issue we can simply allow it to block - try: - data = self.queue.get(True, 0.025) - uid, vid, doc, operation, filestuff = data - except Empty: - #time.sleep(1.0) - continue - - try: - with self._write_lock: - if operation is UPDATE: - # Here we handle the conversion of binary - # documents to plain text for indexing. This is - # done in the thread to keep things async and - # latency lower. - # we know that there is filestuff or it - # wouldn't have been queued - filename, mimetype = filestuff - if isinstance(filename, file): - filename = filename.name - fp = converter(filename, mimetype) - if fp: - # read in at a fixed block size, try to - # conserve memory. If this doesn't work - # we can make doc.fields a generator - while True: - chunk = fp.read(2048) - if not chunk: break - doc.fields.append(secore.Field('fulltext', chunk)) - - self.write_index.replace(doc) - logger.info("update file content %s:%s" % (uid, vid)) - else: - logger.warning("""Conversion process failed for document %s %s""" % (uid, filename)) - else: - logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation)) - - # tell the queue its complete - self.queue.task_done() - - # we do flush on each record now - self.flush() - except: - logger.exception("Error in indexer") - - - def complete_indexing(self): - """Intentionally block until the indexing is complete. Used - primarily in testing. - """ - self.queue.join() - self.flush() - - # - # Field management - def addField(self, key, store=True, exact=False, sortable=False, fulltext=True, - type='string', collapse=False, - **kwargs): - language = kwargs.pop('language', self.language) - external = kwargs.pop('external', False) - - xi = self.write_index.add_field_action - - try: - if store: xi(key, secore.FieldActions.STORE_CONTENT) - if exact: xi(key, secore.FieldActions.INDEX_EXACT) - elif fulltext: - # weight -- int 1 or more - # nopos -- don't include positional information - # noprefix -- boolean - xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs) - - if sortable: - xi(key, secore.FieldActions.SORTABLE, type=type) - if collapse: - xi(key, secore.FieldActions.COLLAPSE) - except secore.IndexerError, e: - logging.warning('Could not add field %r: %s' % (key, e)) - - # track this to find missing field configurations - self.fields.add(key) - - # - # Index Functions - def _mapProperties(self, props): - """data normalization function, maps dicts of key:kind->value - to Property objects - """ - d = {} - add_anything = False - for k,v in props.iteritems(): - p, added = self.datamodel.fromstring(k, v, - allowAddition=True) - if added is True: - self.fields.add(p.key) - add_anything = True - d[p.key] = p - - if add_anything: - with self._write_lock: - self.datamodel.apply(self) - - return d - - def index(self, props, filename=None): - """Index the content of an object. - Props must contain the following: - key -> Property() - """ - operation = UPDATE - # - # Version handling - # - # we implicitly create new versions of documents the version - # id should have been set by the higher level system - uid = props.pop('uid', None) - vid = props.pop('vid', None) - - if not uid: - uid = create_uid() - operation = CREATE - - if vid: vid = str(float(vid) + 1.0) - else: vid = "1.0" - - # Property mapping via model - props = self._mapProperties(props) - doc = secore.UnprocessedDocument() - add = doc.fields.append - fp = None - - - filestuff = None - if filename: - # enque async file processing - # XXX: to make sure the file is kept around we could keep - # and open fp? - mimetype = props.get("mime_type") - mimetype = mimetype and mimetype.value or 'text/plain' - filestuff = (filename, mimetype) - - doc.id = uid - add(secore.Field('vid', vid)) - - # - # Property indexing - for k, prop in props.iteritems(): - value = prop.for_xapian - - if k not in self.fields: - warnings.warn("""Missing field configuration for %s""" % k, - RuntimeWarning) - continue - - # XXX: wrap this in a proper API - if self.datamodel.fields[k][1] == "external": - # XXX: this is done directly inline and could block, - # but the expected datasize is still small - self.backingstore.set_external_property(uid, k, value) - else: - add(secore.Field(k, value)) - - # queue the document for processing - self.enque(uid, vid, doc, operation, filestuff) - - return uid - - def get(self, uid): - doc = self.read_index.get_document(uid) - if not doc: raise KeyError(uid) - return model.Content(doc, self.backingstore, self.datamodel) - - def delete(self, uid): - # does this need queuing? - # the higher level abstractions have to handle interaction - # with versioning policy and so on - self.enque(uid, None, None, DELETE) - - # - # Search - def search(self, query, start_index=0, end_index=4096, order_by=None): - """search the xapian store. - query is a string defining the serach in standard web search syntax. - - ie: it contains a set of search terms. Each search term may be - preceded by a "+" sign to indicate that the term is required, or a "-" - to indicate that is is required to be absent. - """ - ri = self.read_index - if not query: - q = self.read_index.query_all() - elif isinstance(query, dict): - queries = [] - q = query.pop('query', None) - if q: - queries.append(self.parse_query(q)) - if not query and not queries: - # we emptied it - q = self.read_index.query_all() - else: - # each term becomes part of the query join - for k, v in query.iteritems(): - if isinstance(v, dict): - # it might be a range scan - # this needs to be factored out - # and/or we need client side lib that helps - # issue queries because there are type - # conversion issues here - start = v.pop('start', 0) - end = v.pop('end', sys.maxint) - start = parse_timestamp_or_float(start) - end = parse_timestamp_or_float(end) - queries.append(ri.query_range(k, start, end)) - elif isinstance(v, list): - # construct a set of OR queries - ors = [] - for item in v: ors.append(ri.query_field(k, item)) - queries.append(ri.query_composite(ri.OP_OR, ors)) - else: - queries.append(ri.query_field(k, v)) - - q = ri.query_composite(ri.OP_AND, queries) - else: - q = self.parse_query(query) - - if isinstance(order_by, (list, tuple)): - # secore only handles a single item, not a multilayer sort - if order_by: order_by = order_by[0] - else: order_by = None - - results = ri.search(q, start_index, end_index, sortby=order_by, - checkatleast=sys.maxint) - count = results.matches_estimated - - # map the result set to model.Content items - return ContentMappingIter(results, self.backingstore, self.datamodel), count - - def get_all_ids(self): - return [ti.term[1:] for ti in self.read_index._index.allterms('Q')] - - def get_uniquevaluesfor(self, property): - # XXX: this is very sketchy code - # try to get the searchconnection to support this directly - # this should only apply to EXACT fields - r = set() - prefix = self.read_index._field_mappings.get_prefix(property) - plen = len(prefix) - termiter = self.read_index._index.allterms(prefix) - for t in termiter: - term = t.term - if len(term) > plen: - term = term[plen:] - if term.startswith(':'): term = term[1:] - r.add(term) - - # r holds the textual representation of the fields value set - # if the type of field or property needs conversion to a - # different python type this has to happen now - descriptor = self.datamodel.fields.get(property) - if descriptor: - kind = descriptor[1] - impl = model.propertyByKind(kind) - r = set([impl.set(i) for i in r]) - - return r - - def parse_query(self, query): - # accept standard web query like syntax - # 'this' -- match this - # 'this that' -- match this and that in document - # '"this that"' match the exact pharse 'this that' - # 'title:foo' match a document whose title contains 'foo' - # 'title:"A tale of two datastores"' exact title match - # '-this that' match that w/o this - - # limited support for wildcard searches - - qp = _xapian.QueryParser - - flags = (qp.FLAG_LOVEHATE) - - ri = self.read_index - start = 0 - end = len(query) - nextword = re.compile("(\S+)") - endquote = re.compile('(")') - queries = [] - while start < end: - m = nextword.match(query, start) - if not m: break - orig = start - field = None - start = m.end() + 1 - word = m.group(1) - if ':' in word: - # see if its a field match - fieldname, w = word.split(':', 1) - if fieldname in self.fields: - field = fieldname - - word = w - - if word.startswith('"'): - qm = endquote.search(query, start) - if qm: - #XXX: strip quotes or not here - #word = query[orig+1:qm.end(1)-1] - word = query[orig:qm.end(1)] - # this is a phrase modify the flags - flags |= qp.FLAG_PHRASE - start = qm.end(1) + 1 - - if field: - queries.append(ri.query_field(field, word)) - else: - if word.endswith("*"): - flags |= qp.FLAG_WILDCARD - q = self._query_parse(word, flags) - - queries.append(q) - q = ri.query_composite(ri.OP_AND, queries) - return q - - def _query_parse(self, word, flags=0, op=None): - # while newer secore do pass flags it doesn't allow control - # over them at the API level. We override here to support - # adding wildcard searching - ri = self.read_index - if op is None: op = ri.OP_AND - qp = ri._prepare_queryparser(None, None, op) - try: - return qp.parse_query(word, flags) - except _xapian.QueryParserError, e: - # If we got a parse error, retry without boolean operators (since - # these are the usual cause of the parse error). - return qp.parse_query(string, 0) diff --git a/tests/test_perf.py b/tests/test_perf.py new file mode 100644 index 0000000..d1e0269 --- /dev/null +++ b/tests/test_perf.py @@ -0,0 +1,109 @@ +# Copyright (C) 2007 One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import sys +import os +import unittest +import time +import tempfile +import shutil +from datetime import datetime + +import dbus + +DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" +DS_DBUS_PATH = "/org/laptop/sugar/DataStore" + +PROPS_WITHOUT_PREVIEW = {'activity_id': '37fa2f4013b17ae7fc6448f10fe5df53ef92de18', + 'title_set_by_user': '0', + 'title': 'Write Activity', + 'timestamp': int(time.time()), + 'activity': 'org.laptop.AbiWordActivity', + 'share-scope': 'private', + 'keep': 0, + 'icon-color': '#00588C,#00EA11', + 'mtime': datetime.now().isoformat(), + 'preview': '', + 'mime_type': ''} + +PROPS_WITH_PREVIEW = {'activity_id': 'e8594bea74faa80539d93ef1a10de3c712bb2eac', + 'title_set_by_user': '0', + 'title': 'Write Activity', + 'share-scope': 'private', + 'timestamp': int(time.time()), + 'activity': 'org.laptop.AbiWordActivity', + 'fulltext': 'mec mac', + 'keep': 0, + 'icon-color': '#00588C,#00EA11', + 'mtime': datetime.now().isoformat(), + 'preview': dbus.ByteArray('\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xd8\x00\x00\x00\xa2\x08\x02\x00\x00\x00\xac\xfb\x94\x1d\x00\x00\x00\x03sBIT\x08\x08\x08\xdb\xe1O\xe0\x00\x00\x03\x1dIDATx\x9c\xed\xd6\xbfJci\x00\xc6\xe1\xc4\xb5Qb\xb0Qa\xfc\xb3\x0cV\xda\xc8,\xa4\xf0^,\xbc;\xef@\x04kA\xb3(\x16b\x97 3("\xc9\x14\'\x13\xc5\x9c\x9cq\x8bmg\xdcj\x93\x17\xf3<\xedw\x8a\xf7\x83\x1f\xe7\x9c\xfa\xd1\xd1\xd1\xc6\xc6F\r\xa6\xe4\xf1\xf1\xf1\xfe\xfe~~{{\xbb\xd5jM{\x0c\xb3\xeb\xe1\xe1\xa1\xddn\xcf\xbf\xf3D\xaf\xd7\xbb\xbd\xbdm4\x1aooo\xbb\xbb\xbb\x97\x97\x97;;;\xa3\xd1\xa8\xd3\xe9\xb4Z\xad\xaa\xaa...\x96\x96\x96\xca\xb2\xdc\xdf\xdf???_[[[^^\xbe\xb9\xb9\xd9\xdb\xdbk6\x9b\x13\xbb\t\x1f\xc0{!\xd6\xeb\xf5n\xb7\xfb\xf4\xf4\xb4\xb0\xb0p}}\xbd\xbe\xbe~rrR\x14\xc5\xc1\xc1AY\x96\x8dF\xe3\xea\xea\xaa\xaa\xaa\xc5\xc5\xc5V\xab\xd5\xe9tNOOWVV\x0e\x0f\x0f\x87\xc3\xe1\xc4.\xc0\xc70\xf7\xce\xd9`0(\xcb\xf2\xc7`\xf0\xbd\xdf\xffsk\xebgU\xfd\xf5\xe5K\xb3\xd9\xfc\xbb\xdd\xfecn\xee\xf9\xf9\xf9\xf5\xf5\xb5(\x8a\xe1pxww\xd7\xef\xf7\x8b\xa2X]]=;;\x9b\xd8z>\x8cz\xbb\xdd\xfe\xed?\xe2\xb7o\xb5\xaf_\x7f}\xf4\xe9S\xed\xf3\xe7\xffo\x16\xb3\xe3\xbf\xff\x11k\x9b\x9b\xb5\xcd\xcdI\xeda\xa6\xbd\xf7i\x86\x89\x11"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!2e\xe3\xf1x<\x1e\xcfO{\x06\xb3\xee\xf8\xf8\xb8\xd7\xeby#2e///UU\t\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\xccw\xbb\xdd\xd1h4\xed\x19\xcc\xae\x7f\xf3\xfb\x07q8\x9emk8\x97\xda\x00\x00\x00\x00IEND\xaeB`\x82'), + 'mime_type': 'application/vnd.oasis.opendocument.text'} + +def prepare_file(): + file_path = os.path.join(os.getcwd(), 'tests/funkyabi.odt') + f, tmp_path = tempfile.mkstemp() + os.close(f) + shutil.copyfile(file_path, tmp_path) + return tmp_path + +bus = dbus.SessionBus() +proxy = bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH) +data_store = dbus.Interface(proxy, DS_DBUS_INTERFACE) + +uids = [] + +n = 100 + +total = 0 +print 'Creating %r entries' % n +for i in range(n): + file_path = prepare_file() + t = time.time() + uids.append(data_store.create(PROPS_WITHOUT_PREVIEW, file_path, True)) + total += time.time() - t +print 'Created %r entries in %.2f ms. avg' % (n, float(total * 1000) / n) + +total = 0 +print 'Updating %r entries' % len(uids) +for uid in uids: + file_path = prepare_file() + t = time.time() + data_store.update(uid, PROPS_WITH_PREVIEW, file_path, True) + total += time.time() - t +print 'Updated %r entries in %.2f ms. avg' % (n, float(total * 1000) / len(uids)) + +total = 0 +print 'Retrieving %r full entries' % len(uids) +for uid in uids: + t = time.time() + props = data_store.get_properties(uid) + total += time.time() - t +print 'Retrieved %r full entries in %.2f ms. avg' % (n, float(total * 1000) / len(uids)) + +total = 0 +query = {'order_by': ['-mtime'], 'limit': 80} +properties = ['uid', 'title', 'mtime', 'timestamp', 'keep', 'buddies', + 'icon-color', 'mime_type', 'progress', 'activity', 'mountpoint', + 'activity_id'] +results, count = data_store.find(query, properties) +print 'Searching %r times on a journal with %d entries' % (n, count) +for i in range(n): + t = time.time() + results, count = data_store.find(query, properties) + total += time.time() - t +print 'Searched %r full entries in %.2f ms. avg' % (n, float(total * 1000) / n) + diff --git a/tests/test_sugar.py b/tests/test_sugar.py index 39a0e8c..1dbb607 100644 --- a/tests/test_sugar.py +++ b/tests/test_sugar.py @@ -34,9 +34,9 @@ DS_DBUS_PATH = "/org/laptop/sugar/DataStore" PROPS_WITHOUT_PREVIEW = {'activity_id': '37fa2f4013b17ae7fc6448f10fe5df53ef92de18', 'title_set_by_user': '0', 'title': 'Write Activity', - 'timestamp': int(time.time()), + 'timestamp': str(int(time.time())), 'activity': 'org.laptop.AbiWordActivity', - 'share-scope': 'private', + 'share-scope': 'private\nmoc', 'keep': '0', 'icon-color': '#00588C,#00EA11', 'mtime': datetime.now().isoformat(), @@ -47,7 +47,7 @@ PROPS_WITH_PREVIEW = {'activity_id': 'e8594bea74faa80539d93ef1a10de3c712bb2eac', 'title_set_by_user': '0', 'title': 'Write Activity', 'share-scope': 'private', - 'timestamp': int(time.time()), + 'timestamp': str(int(time.time())), 'activity': 'org.laptop.AbiWordActivity', 'fulltext': 'mec mac', 'keep': '0', @@ -82,7 +82,7 @@ class CommonTest(unittest.TestCase): query = {'order_by': ['-mtime'], 'limit': 80} t = time.time() - results, count = self._data_store.find(query, []) + results, count = self._data_store.find(query, ['uid', 'title']) t = time.time() - t return t @@ -106,12 +106,16 @@ class FunctionalityTest(CommonTest): def testresume(self): t, uid = self.create() props = self._data_store.get_properties(uid, byte_arrays=True) - #del props['uid'] + del props['uid'] + del props['mountpoint'] + del props['checksum'] assert props == PROPS_WITHOUT_PREVIEW t = self.update(uid) props = self._data_store.get_properties(uid, byte_arrays=True) - #del props['uid'] + del props['uid'] + del props['mountpoint'] + del props['checksum'] assert props == PROPS_WITH_PREVIEW file_name = self._data_store.get_filename(uid) @@ -120,6 +124,12 @@ class FunctionalityTest(CommonTest): f = open(file_name, 'r') f.close() + results, count = self._data_store.find({'uid': uid}, ['uid', 'title'], + byte_arrays=True) + assert count == 1 + assert results[0]['uid'] == uid + assert results[0]['title'] == 'Write Activity' + """ def testcustomproperties(self): t, uid = self.create() @@ -130,8 +140,7 @@ class FunctionalityTest(CommonTest): props = self._data_store.get_properties(uid) assert props['custom_property'] == 'test' - results, count = self._data_store.find({'custom_property': 'test'}, []) - assert count > 1 + results, count = self._data_store.find({'custom_property': 'test'}, ['custom_property']) for entry in results: assert entry['custom_property'] == 'test' uid = entry['uid'] @@ -140,7 +149,10 @@ class FunctionalityTest(CommonTest): """ def testfind(self): - t = self.find() + results, count = self._data_store.find({}, ['uid']) + assert count > 0 + + print self.find() class PerformanceTest(CommonTest): @@ -176,7 +188,7 @@ class PerformanceTest(CommonTest): if __name__ == '__main__': suite = unittest.TestSuite() - #suite.addTest(unittest.makeSuite(FunctionalityTest)) - suite.addTest(unittest.makeSuite(PerformanceTest)) + suite.addTest(unittest.makeSuite(FunctionalityTest)) + #suite.addTest(unittest.makeSuite(PerformanceTest)) unittest.TextTestRunner().run(suite) |