Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2008-10-08 16:37:38 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2008-10-08 16:37:38 (GMT)
commit9bd1bff4d8ab6247e5793c0b78b9bf2581773341 (patch)
tree13a4914f0de789d3c6dd9acf70a273f6d34c9230
parente29723a33192cb27b1de83fd8e1abd110bbc1433 (diff)
parentb1c4a254adc0573221c0d6661459248fd3c507ef (diff)
Merge branch 'master' of ../datastore2
Conflicts: src/olpc/datastore/backingstore.py src/olpc/datastore/datastore.py src/olpc/datastore/xapianindex.py tests/test_sugar.py
-rw-r--r--Makefile.am5
-rw-r--r--README.txt44
-rwxr-xr-xautogen.sh2
-rwxr-xr-xbin/datastore-service31
-rw-r--r--configure.ac6
-rw-r--r--m4/python.m462
-rw-r--r--secore/Makefile.am12
-rw-r--r--secore/__init__.py30
-rw-r--r--secore/datastructures.py221
-rw-r--r--secore/errors.py40
-rw-r--r--secore/fieldactions.py358
-rw-r--r--secore/fieldmappings.py123
-rw-r--r--secore/highlight.py310
-rw-r--r--secore/indexerconnection.py382
-rw-r--r--secore/marshall.py73
-rw-r--r--secore/parsedate.py56
-rw-r--r--secore/searchconnection.py618
-rw-r--r--src/olpc/datastore/Makefile.am26
-rw-r--r--src/olpc/datastore/__init__.py4
-rw-r--r--src/olpc/datastore/__version__.py15
-rw-r--r--src/olpc/datastore/backingstore.py987
-rw-r--r--src/olpc/datastore/bin_copy.py27
-rw-r--r--src/olpc/datastore/converter.py165
-rw-r--r--src/olpc/datastore/datastore.py638
-rw-r--r--src/olpc/datastore/filestore.py205
-rw-r--r--src/olpc/datastore/indexstore.py234
-rw-r--r--src/olpc/datastore/layoutmanager.py101
-rw-r--r--src/olpc/datastore/metadatareader.c199
-rw-r--r--src/olpc/datastore/metadatastore.py50
-rw-r--r--src/olpc/datastore/migration.py81
-rw-r--r--src/olpc/datastore/model.py412
-rw-r--r--src/olpc/datastore/optimizer.py153
-rw-r--r--src/olpc/datastore/utils.py162
-rw-r--r--src/olpc/datastore/xapianindex.py575
-rw-r--r--tests/test_perf.py109
-rw-r--r--tests/test_sugar.py34
36 files changed, 1469 insertions, 5081 deletions
diff --git a/Makefile.am b/Makefile.am
index 37d04c1..5fa2790 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,7 +1,10 @@
-SUBDIRS = bin etc secore src
+ACLOCAL_AMFLAGS = -I m4
+
+SUBDIRS = bin etc src
test:
@cd tests
$(MAKE) -C tests test
EXTRA_DIST = README.txt LICENSE.GPL
+
diff --git a/README.txt b/README.txt
index d2901d8..8b13789 100644
--- a/README.txt
+++ b/README.txt
@@ -1,45 +1 @@
-Datastore
----------
-A simple log like datastore able to connect with multiple
-backends. The datastore supports connectionig and disconnecting from
-backends on the fly to help the support the limit space/memory
-characteristics of the OLPC system and the fact that network services
-may become unavailable at times
-
-API
----
-
-For developer information see the doc tests in:
-
- src/olpc/datastore/tests/query.txt
-
-
-Dependencies
-------------
-
-xapian -- integrated full text indexing
- svn co svn://svn.xapian.org/xapian/trunk xapian
- currently this requires a checkout
-
-secore -- pythonic xapian binding -- include in disto but from
- http://flaxcode.googlecode.com/svn/trunk/libs/secore
-
-dbus -- command and control
-
-ore.main -- (optional) A command line application framework/shell
- used in bin/datasore. If you don't want to use this dep
- for now run bin/datasore-native
-
-
-
-Converters
-----------
-(used to index binaries)
-
-pdftotext from poppler-utils
-abiword/write
-
-
-Benjamin Saller
-Copyright ObjectRealms, LLC. 2007
diff --git a/autogen.sh b/autogen.sh
index 9bd6fd0..1cd5db4 100755
--- a/autogen.sh
+++ b/autogen.sh
@@ -1,3 +1,5 @@
#!/bin/sh
+export ACLOCAL="aclocal -I m4"
+
autoreconf -i
./configure "$@"
diff --git a/bin/datastore-service b/bin/datastore-service
index aba7112..850af9d 100755
--- a/bin/datastore-service
+++ b/bin/datastore-service
@@ -4,46 +4,26 @@ import gobject
import dbus.service
import dbus.mainloop.glib
import dbus.glib
-from olpc.datastore import DataStore, backingstore
+from olpc.datastore.datastore import DataStore
from sugar import logger
# Path handling
profile = os.environ.get('SUGAR_PROFILE', 'default')
base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
-repo_dir = os.path.join(base_dir, 'datastore')
-
-# operate from the repo directory
-if not os.path.exists(repo_dir): os.makedirs(repo_dir)
-
log_dir = os.path.join(base_dir, "logs")
if not os.path.exists(log_dir): os.makedirs(log_dir)
-#os.chdir(repo_dir)
-
# setup logger
logger.start('datastore')
-# check for old lockfiles, the rules here are that we can't be
-# connected to a tty. If we are not then in all likelyhood the process
-# was started automatically, which hopefully implies a single instance
-if not sys.stdin.isatty():
- lf = os.path.join(repo_dir, 'fulltext', 'flintlock')
- if os.path.exists(lf):
- logging.warning("Old lock file found -- removing.")
- os.unlink(lf)
-
-
# build the datastore
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
connected = True
ds = DataStore()
-ds.registerBackend(backingstore.FileBackingStore)
-ds.registerBackend(backingstore.InplaceFileBackingStore)
# and run it
-logging.info("Starting Datastore %s" % (repo_dir))
mainloop = gobject.MainLoop()
def handle_disconnect():
@@ -65,11 +45,6 @@ signal.signal(signal.SIGHUP, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
def main():
- if '-m' in sys.argv:
- # mount automatically for local testing
- ds.mount(repo_dir)
- ds.complete_indexing()
-
try:
mainloop.run()
except KeyboardInterrupt:
@@ -81,7 +56,3 @@ main()
ds.stop()
-#import hotshot
-#p = hotshot.Profile('hs.prof')
-#p.run('main()')
-
diff --git a/configure.ac b/configure.ac
index dbcc25d..1531c69 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,17 +2,21 @@ AC_INIT([sugar-datastore],[0.82.0],[],[sugar-datastore])
AC_PREREQ([2.59])
+AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([configure.ac])
AM_INIT_AUTOMAKE([1.9 foreign dist-bzip2 no-dist-gzip])
+AC_DISABLE_STATIC
+AC_PROG_LIBTOOL
+
AM_PATH_PYTHON
+AM_CHECK_PYTHON_HEADERS(,[AC_MSG_ERROR(could not find Python headers)])
AC_OUTPUT([
Makefile
bin/Makefile
etc/Makefile
-secore/Makefile
src/Makefile
src/olpc/Makefile
src/olpc/datastore/Makefile
diff --git a/m4/python.m4 b/m4/python.m4
new file mode 100644
index 0000000..e1c5266
--- /dev/null
+++ b/m4/python.m4
@@ -0,0 +1,62 @@
+## this one is commonly used with AM_PATH_PYTHONDIR ...
+dnl AM_CHECK_PYMOD(MODNAME [,SYMBOL [,ACTION-IF-FOUND [,ACTION-IF-NOT-FOUND]]])
+dnl Check if a module containing a given symbol is visible to python.
+AC_DEFUN([AM_CHECK_PYMOD],
+[AC_REQUIRE([AM_PATH_PYTHON])
+py_mod_var=`echo $1['_']$2 | sed 'y%./+-%__p_%'`
+AC_MSG_CHECKING(for ifelse([$2],[],,[$2 in ])python module $1)
+AC_CACHE_VAL(py_cv_mod_$py_mod_var, [
+ifelse([$2],[], [prog="
+import sys
+try:
+ import $1
+except ImportError:
+ sys.exit(1)
+except:
+ sys.exit(0)
+sys.exit(0)"], [prog="
+import $1
+$1.$2"])
+if $PYTHON -c "$prog" 1>&AC_FD_CC 2>&AC_FD_CC
+ then
+ eval "py_cv_mod_$py_mod_var=yes"
+ else
+ eval "py_cv_mod_$py_mod_var=no"
+ fi
+])
+py_val=`eval "echo \`echo '$py_cv_mod_'$py_mod_var\`"`
+if test "x$py_val" != xno; then
+ AC_MSG_RESULT(yes)
+ ifelse([$3], [],, [$3
+])dnl
+else
+ AC_MSG_RESULT(no)
+ ifelse([$4], [],, [$4
+])dnl
+fi
+])
+
+dnl a macro to check for ability to create python extensions
+dnl AM_CHECK_PYTHON_HEADERS([ACTION-IF-POSSIBLE], [ACTION-IF-NOT-POSSIBLE])
+dnl function also defines PYTHON_INCLUDES
+AC_DEFUN([AM_CHECK_PYTHON_HEADERS],
+[AC_REQUIRE([AM_PATH_PYTHON])
+AC_MSG_CHECKING(for headers required to compile python extensions)
+dnl deduce PYTHON_INCLUDES
+py_prefix=`$PYTHON -c "import sys; print sys.prefix"`
+py_exec_prefix=`$PYTHON -c "import sys; print sys.exec_prefix"`
+PYTHON_INCLUDES="-I${py_prefix}/include/python${PYTHON_VERSION}"
+if test "$py_prefix" != "$py_exec_prefix"; then
+ PYTHON_INCLUDES="$PYTHON_INCLUDES -I${py_exec_prefix}/include/python${PYTHON_VERSION}"
+fi
+AC_SUBST(PYTHON_INCLUDES)
+dnl check if the headers exist:
+save_CPPFLAGS="$CPPFLAGS"
+CPPFLAGS="$CPPFLAGS $PYTHON_INCLUDES"
+AC_TRY_CPP([#include <Python.h>],dnl
+[AC_MSG_RESULT(found)
+$1],dnl
+[AC_MSG_RESULT(not found)
+$2])
+CPPFLAGS="$save_CPPFLAGS"
+])
diff --git a/secore/Makefile.am b/secore/Makefile.am
deleted file mode 100644
index 393ba8f..0000000
--- a/secore/Makefile.am
+++ /dev/null
@@ -1,12 +0,0 @@
-datastoredir = $(pythondir)/secore
-datastore_PYTHON = \
- __init__.py \
- datastructures.py \
- fieldmappings.py \
- searchconnection.py \
- errors.py \
- highlight.py \
- marshall.py \
- fieldactions.py \
- indexerconnection.py \
- parsedate.py
diff --git a/secore/__init__.py b/secore/__init__.py
deleted file mode 100644
index 157fea4..0000000
--- a/secore/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-"""Search engine Core.
-
-See the accompanying documentation for details. In particular, there should be
-an accompanying file "introduction.html" (or "introduction.rst") which gives
-details of how to use the secore package.
-
-"""
-__docformat__ = "restructuredtext en"
-
-from datastructures import *
-from errors import *
-from indexerconnection import *
-from searchconnection import *
diff --git a/secore/datastructures.py b/secore/datastructures.py
deleted file mode 100644
index 588ac06..0000000
--- a/secore/datastructures.py
+++ /dev/null
@@ -1,221 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""datastructures.py: Datastructures for search engine core.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import xapian as _xapian
-import cPickle as _cPickle
-
-class Field(object):
- # Use __slots__ because we're going to have very many Field objects in
- # typical usage.
- __slots__ = 'name', 'value'
-
- def __init__(self, name, value):
- self.name = name
- self.value = value
-
- def __repr__(self):
- return 'Field(%r, %r)' % (self.name, self.value)
-
-class UnprocessedDocument(object):
- """A unprocessed document to be passed to the indexer.
-
- This represents an item to be processed and stored in the search engine.
- Each document will be processed by the indexer to generate a
- ProcessedDocument, which can then be stored in the search engine index.
-
- Note that some information in an UnprocessedDocument will not be
- represented in the ProcessedDocument: therefore, it is not possible to
- retrieve an UnprocessedDocument from the search engine index.
-
- An unprocessed document is a simple container with two attributes:
-
- - `fields` is a list of Field objects.
- - `id` is a string holding a unique identifier for the document (or
- None to get the database to allocate a unique identifier automatically
- when the document is added).
-
- """
-
- __slots__ = 'id', 'fields',
- def __init__(self, id=None, fields=None):
- self.id = id
- if fields is None:
- self.fields = []
- else:
- self.fields = fields
-
- def __repr__(self):
- return 'UnprocessedDocument(%r, %r)' % (self.id, self.fields)
-
-class ProcessedDocument(object):
- """A processed document, as stored in the index.
-
- This represents an item which is ready to be stored in the search engine,
- or which has been returned by the search engine.
-
- """
-
- __slots__ = '_doc', '_fieldmappings', '_data', '_id'
- def __init__(self, fieldmappings, xapdoc=None):
- """Create a ProcessedDocument.
-
- `fieldmappings` is the configuration from a database connection used lookup
- the configuration to use to store each field.
-
- If supplied, `xapdoc` is a Xapian document to store in the processed
- document. Otherwise, a new Xapian document is created.
-
- """
- if xapdoc is None:
- self._doc = _xapian.Document()
- else:
- self._doc = xapdoc
- self._fieldmappings = fieldmappings
- self._data = None
- self._id = None
-
- def add_term(self, field, term, wdfinc=1, positions=None):
- """Add a term to the document.
-
- Terms are the main unit of information used for performing searches.
-
- - `field` is the field to add the term to.
- - `term` is the term to add.
- - `wdfinc` is the value to increase the within-document-frequency
- measure for the term by.
- - `positions` is the positional information to add for the term.
- This may be None to indicate that there is no positional information,
- or may be an integer to specify one position, or may be a sequence of
- integers to specify several positions. (Note that the wdf is not
- increased automatically for each position: if you add a term at 7
- positions, and the wdfinc value is 2, the total wdf for the term will
- only be increased by 2, not by 14.)
-
- """
- prefix = self._fieldmappings.get_prefix(field)
- if len(term) > 0:
- # We use the following check, rather than "isupper()" to ensure
- # that we match the check performed by the queryparser, regardless
- # of our locale.
- if ord(term[0]) >= ord('A') and ord(term[0]) <= ord('Z'):
- prefix = prefix + ':'
- if positions is None:
- self._doc.add_term(prefix + term, wdfinc)
- elif isinstance(positions, int):
- self._doc.add_posting(prefix + term, positions, wdfinc)
- else:
- self._doc.add_term(prefix + term, wdfinc)
- for pos in positions:
- self._doc.add_posting(prefix + term, pos, 0)
-
- def add_value(self, field, value):
- """Add a value to the document.
-
- Values are additional units of information used when performing
- searches. Note that values are _not_ intended to be used to store
- information for display in the search results - use the document data
- for that. The intention is that as little information as possible is
- stored in values, so that they can be accessed as quickly as possible
- during the search operation.
-
- Unlike terms, each document may have at most one value in each field
- (whereas there may be an arbitrary number of terms in a given field).
- If an attempt to add multiple values to a single field is made, only
- the last value added will be stored.
-
- """
- slot = self._fieldmappings.get_slot(field)
- self._doc.add_value(slot, value)
-
- def get_value(self, field):
- """Get a value from the document.
-
- """
- slot = self._fieldmappings.get_slot(field)
- return self._doc.get_value(slot)
-
- def prepare(self):
- """Prepare the document for adding to a xapian database.
-
- This updates the internal xapian document with any changes which have
- been made, and then returns it.
-
- """
- if self._data is not None:
- self._doc.set_data(_cPickle.dumps(self._data, 2))
- self._data = None
- return self._doc
-
- def _get_data(self):
- if self._data is None:
- rawdata = self._doc.get_data()
- if rawdata == '':
- self._data = {}
- else:
- self._data = _cPickle.loads(rawdata)
- return self._data
- def _set_data(self, data):
- if not isinstance(data, dict):
- raise TypeError("Cannot set data to any type other than a dict")
- self._data = data
- data = property(_get_data, _set_data, doc=
- """The data stored in this processed document.
-
- This data is a dictionary of entries, where the key is a fieldname, and the
- value is a list of strings.
-
- """)
-
- def _get_id(self):
- if self._id is None:
- tl = self._doc.termlist()
- try:
- term = tl.skip_to('Q').term
- if len(term) == 0 or term[0] != 'Q':
- return None
- self._id = term[1:]
- except StopIteration:
- self._id = None
- return None
-
- return self._id
- def _set_id(self, id):
- tl = self._doc.termlist()
- try:
- term = tl.skip_to('Q').term
- except StopIteration:
- term = ''
- if len(term) != 0 and term[0] == 'Q':
- self._doc.remove_term(term)
- if id is not None:
- self._doc.add_term('Q' + id, 0)
- id = property(_get_id, _set_id, doc=
- """The unique ID for this document.
-
- """)
-
- def __repr__(self):
- return '<ProcessedDocument(%r)>' % (self.id)
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod (sys.modules[__name__])
diff --git a/secore/errors.py b/secore/errors.py
deleted file mode 100644
index b6ad00f..0000000
--- a/secore/errors.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""errors.py: Exceptions for the search engine core.
-
-"""
-__docformat__ = "restructuredtext en"
-
-class SearchEngineError(Exception):
- r"""Base class for exceptions thrown by the search engine.
-
- Any errors generated by the python level interface to xapian will be
- instances of this class or its subclasses.
-
- """
-
-class IndexerError(SearchEngineError):
- r"""Class used to report errors from the indexing API.
-
- """
-
-class SearchError(SearchEngineError):
- r"""Class used to report errors from the search API.
-
- """
-
diff --git a/secore/fieldactions.py b/secore/fieldactions.py
deleted file mode 100644
index c595f0b..0000000
--- a/secore/fieldactions.py
+++ /dev/null
@@ -1,358 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""fieldactions.py: Definitions and implementations of field actions.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import errors as _errors
-import marshall as _marshall
-import xapian as _xapian
-import parsedate as _parsedate
-
-def _act_store_content(fieldname, doc, value, context):
- """Perform the STORE_CONTENT action.
-
- """
- try:
- fielddata = doc.data[fieldname]
- except KeyError:
- fielddata = []
- doc.data[fieldname] = fielddata
- fielddata.append(value)
-
-def _act_index_exact(fieldname, doc, value, context):
- """Perform the INDEX_EXACT action.
-
- """
- doc.add_term(fieldname, value, 0)
-
-def _act_index_freetext(fieldname, doc, value, context, weight=1,
- language=None, stop=None, spell=False,
- nopos=False, noprefix=False):
- """Perform the INDEX_FREETEXT action.
-
- """
- termgen = _xapian.TermGenerator()
- if language is not None:
- termgen.set_stemmer(_xapian.Stem(language))
-
- if stop is not None:
- stopper = _xapian.SimpleStopper()
- for term in stop:
- stopper.add (term)
- termgen.set_stopper (stopper)
-
- if spell:
- termgen.set_database(context.index)
- termgen.set_flags(termgen.FLAG_SPELLING)
-
- termgen.set_document(doc._doc)
- termgen.set_termpos(context.current_position)
- if nopos:
- termgen.index_text_without_positions(value, weight, '')
- else:
- termgen.index_text(value, weight, '')
-
- if not noprefix:
- # Store a second copy of the term with a prefix, for field-specific
- # searches.
- prefix = doc._fieldmappings.get_prefix(fieldname)
- if len(prefix) != 0:
- termgen.set_termpos(context.current_position)
- if nopos:
- termgen.index_text_without_positions(value, weight, prefix)
- else:
- termgen.index_text(value, weight, prefix)
-
- # Add a gap between each field instance, so that phrase searches don't
- # match across instances.
- termgen.increase_termpos(10)
- context.current_position = termgen.get_termpos()
-
-class SortableMarshaller(object):
- """Implementation of marshalling for sortable values.
-
- """
- def __init__(self, indexing=True):
- if indexing:
- self._err = _errors.IndexerError
- else:
- self._err = _errors.SearchError
-
- def marshall_string(self, fieldname, value):
- """Marshall a value for sorting in lexicograpical order.
-
- This returns the input as the output, since strings already sort in
- lexicographical order.
-
- """
- return value
-
- def marshall_float(self, fieldname, value):
- """Marshall a value for sorting as a floating point value.
-
- """
- # convert the value to a float
- try:
- value = float(value)
- except ValueError:
- raise self._err("Value supplied to field %r must be a "
- "valid floating point number: was %r" %
- (fieldname, value))
- return _marshall.float_to_string(value)
-
- def marshall_date(self, fieldname, value):
- """Marshall a value for sorting as a date.
-
- """
- try:
- value = _parsedate.date_from_string(value)
- except ValueError, e:
- raise self._err("Value supplied to field %r must be a "
- "valid date: was %r: error is '%s'" %
- (fieldname, value, str(e)))
- return _marshall.date_to_string(value)
-
- def get_marshall_function(self, fieldname, sorttype):
- """Get a function used to marshall values of a given sorttype.
-
- """
- try:
- return {
- None: self.marshall_string,
- 'string': self.marshall_string,
- 'float': self.marshall_float,
- 'date': self.marshall_date,
- }[sorttype]
- except KeyError:
- raise self._err("Unknown sort type %r for field %r" %
- (sorttype, fieldname))
-
-
-def _act_sort_and_collapse(fieldname, doc, value, context, type=None):
- """Perform the SORTABLE action.
-
- """
- marshaller = SortableMarshaller()
- fn = marshaller.get_marshall_function(fieldname, type)
- value = fn(fieldname, value)
- doc.add_value(fieldname, value)
-
-class ActionContext(object):
- """The context in which an action is performed.
-
- This is just used to pass term generators, word positions, and the like
- around.
-
- """
- def __init__(self, index):
- self.current_language = None
- self.current_position = 0
- self.index = index
-
-class FieldActions(object):
- """An object describing the actions to be performed on a field.
-
- The supported actions are:
-
- - `STORE_CONTENT`: store the unprocessed content of the field in the search
- engine database. All fields which need to be displayed or used when
- displaying the search results need to be given this action.
-
- - `INDEX_EXACT`: index the exact content of the field as a single search
- term. Fields whose contents need to be searchable as an "exact match"
- need to be given this action.
-
- - `INDEX_FREETEXT`: index the content of this field as text. The content
- will be split into terms, allowing free text searching of the field. Four
- optional parameters may be supplied:
-
- - 'weight' is a multiplier to apply to the importance of the field. This
- must be an integer, and the default value is 1.
- - 'language' is the language to use when processing the field. This can
- be expressed as an ISO 2-letter language code. The supported languages
- are those supported by the xapian core in use.
- - 'stop' is an iterable of stopwords to filter out of the generated
- terms. Note that due to Xapian design, only non-positional terms are
- affected, so this is of limited use.
- - 'spell' is a boolean flag - if true, the contents of the field will be
- used for spelling correction.
- - 'nopos' is a boolean flag - if true, positional information is not
- stored.
- - 'noprefix' is a boolean flag - if true, prevents terms with the field
- prefix being generated. This means that searches specific to this
- field will not work, and thus should only be used for special cases.
-
- - `SORTABLE`: index the content of the field such that it can be used to
- sort result sets. It also allows result sets to be restricted to those
- documents with a field values in a given range. One optional parameter
- may be supplied:
-
- - 'type' is a value indicating how to sort the field. It has several
- possible values:
-
- - 'string' - sort in lexicographic (ie, alphabetical) order.
- This is the default, used if no type is set.
- - 'float' - treat the values as (decimal representations of) floating
- point numbers, and sort in numerical order . The values in the field
- must be valid floating point numbers (according to Python's float()
- function).
- - 'date' - sort in date order. The values must be valid dates (either
- Python datetime.date objects, or ISO 8601 format (ie, YYYYMMDD or
- YYYY-MM-DD).
-
- - `COLLAPSE`: index the content of the field such that it can be used to
- "collapse" result sets, such that only the highest result with each value
- of the field will be returned.
-
- """
-
- # See the class docstring for the meanings of the following constants.
- STORE_CONTENT = 1
- INDEX_EXACT = 2
- INDEX_FREETEXT = 3
- SORTABLE = 4
- COLLAPSE = 5
-
- # Sorting and collapsing store the data in a value, but the format depends
- # on the sort type. Easiest way to implement is to treat them as the same
- # action.
- SORT_AND_COLLAPSE = -1
-
- # NEED_SLOT is a flag used to indicate that an action needs a slot number
- NEED_SLOT = 1
- # NEED_PREFIX is a flag used to indicate that an action needs a prefix
- NEED_PREFIX = 2
-
- def __init__(self, fieldname):
- # Dictionary of actions, keyed by type.
- self._actions = {}
- self._fieldname = fieldname
-
- def add(self, field_mappings, action, **kwargs):
- """Add an action to perform on a field.
-
- """
- if action not in (FieldActions.STORE_CONTENT,
- FieldActions.INDEX_EXACT,
- FieldActions.INDEX_FREETEXT,
- FieldActions.SORTABLE,
- FieldActions.COLLAPSE,):
- raise _errors.IndexerError("Unknown field action: %r" % action)
-
- info = self._action_info[action]
-
- # Check parameter names
- for key in kwargs.keys():
- if key not in info[1]:
- raise _errors.IndexerError("Unknown parameter name for action %r: %r" % (info[0], key))
-
- # Fields cannot be indexed both with "EXACT" and "FREETEXT": whilst we
- # could implement this, the query parser wouldn't know what to do with
- # searches.
- if action == FieldActions.INDEX_EXACT:
- if FieldActions.INDEX_FREETEXT in self._actions:
- raise _errors.IndexerError("Field %r is already marked for indexing "
- "as free text: cannot mark for indexing "
- "as exact text as well" % self._fieldname)
- if action == FieldActions.INDEX_FREETEXT:
- if FieldActions.INDEX_EXACT in self._actions:
- raise _errors.IndexerError("Field %r is already marked for indexing "
- "as exact text: cannot mark for indexing "
- "as free text as well" % self._fieldname)
-
- # Fields cannot be indexed as more than one type for "SORTABLE": to
- # implement this, we'd need to use a different prefix for each sortable
- # type, but even then the search end wouldn't know what to sort on when
- # searching. Also, if they're indexed as "COLLAPSE", the value must be
- # stored in the right format for the type "SORTABLE".
- if action == FieldActions.SORTABLE or action == FieldActions.COLLAPSE:
- if action == FieldActions.COLLAPSE:
- sorttype = None
- else:
- try:
- sorttype = kwargs['type']
- except KeyError:
- sorttype = 'string'
- kwargs['type'] = sorttype
- action = FieldActions.SORT_AND_COLLAPSE
-
- try:
- oldsortactions = self._actions[FieldActions.SORT_AND_COLLAPSE]
- except KeyError:
- oldsortactions = ()
-
- if len(oldsortactions) > 0:
- for oldsortaction in oldsortactions:
- oldsorttype = oldsortaction['type']
-
- if sorttype == oldsorttype or oldsorttype is None:
- # Use new type
- self._actions[action] = []
- elif sorttype is None:
- # Use old type
- return
- else:
- raise _errors.IndexerError("Field %r is already marked for "
- "sorting, with a different "
- "sort type" % self._fieldname)
-
- if self.NEED_PREFIX in info[3]:
- field_mappings.add_prefix(self._fieldname)
- if self.NEED_SLOT in info[3]:
- field_mappings.add_slot(self._fieldname)
-
- # Make an entry for the action
- if action not in self._actions:
- self._actions[action] = []
-
- # Check for repetitions of actions
- for old_action in self._actions[action]:
- if old_action == kwargs:
- return
-
- # Append the action to the list of actions
- self._actions[action].append(kwargs)
-
- def perform(self, doc, value, context):
- """Perform the actions on the field.
-
- - `doc` is a ProcessedDocument to store the result of the actions in.
- - `value` is a string holding the value of the field.
- - `context` is an ActionContext object used to keep state in.
-
- """
- for type, actionlist in self._actions.iteritems():
- info = self._action_info[type]
- for kwargs in actionlist:
- info[2](self._fieldname, doc, value, context, **kwargs)
-
- _action_info = {
- STORE_CONTENT: ('STORE_CONTENT', (), _act_store_content, (), ),
- INDEX_EXACT: ('INDEX_EXACT', (), _act_index_exact, (NEED_PREFIX,), ),
- INDEX_FREETEXT: ('INDEX_FREETEXT', ('weight', 'language', 'stop', 'spell', 'nopos', 'noprefix', ),
- _act_index_freetext, (NEED_PREFIX, ), ),
- SORTABLE: ('SORTABLE', ('type', ), None, (NEED_SLOT,), ),
- COLLAPSE: ('COLLAPSE', (), None, (NEED_SLOT,), ),
- SORT_AND_COLLAPSE: ('SORT_AND_COLLAPSE', ('type', ), _act_sort_and_collapse, (NEED_SLOT,), ),
- }
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod (sys.modules[__name__])
diff --git a/secore/fieldmappings.py b/secore/fieldmappings.py
deleted file mode 100644
index 3838ce5..0000000
--- a/secore/fieldmappings.py
+++ /dev/null
@@ -1,123 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""fieldmappings.py: Mappings from field names to term prefixes, etc.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import cPickle as _cPickle
-
-class FieldMappings(object):
- """Mappings from field names to term prefixes, slot values, etc.
-
- The following mappings are maintained:
-
- - a mapping from field name to the string prefix to insert at the start of
- terms.
- - a mapping from field name to the slot numbers to store the field contents
- in.
-
- """
- __slots__ = '_prefixes', '_prefixcount', '_slots', '_slotcount',
-
- def __init__(self, serialised=None):
- """Create a new field mapping object, or unserialise a saved one.
-
- """
- if serialised is not None:
- (self._prefixes, self._prefixcount,
- self._slots, self._slotcount) = _cPickle.loads(serialised)
- else:
- self._prefixes = {}
- self._prefixcount = 0
- self._slots = {}
- self._slotcount = 0
-
- def _genPrefix(self):
- """Generate a previously unused prefix.
-
- Prefixes are uppercase letters, and start with 'X' (this is a Xapian
- convention, for compatibility with other Xapian tools: other starting
- letters are reserved for special meanings):
-
- >>> maps = FieldMappings()
- >>> maps._genPrefix()
- 'XA'
- >>> maps._genPrefix()
- 'XB'
- >>> [maps._genPrefix() for i in xrange(60)]
- ['XC', 'XD', 'XE', 'XF', 'XG', 'XH', 'XI', 'XJ', 'XK', 'XL', 'XM', 'XN', 'XO', 'XP', 'XQ', 'XR', 'XS', 'XT', 'XU', 'XV', 'XW', 'XX', 'XY', 'XZ', 'XAA', 'XBA', 'XCA', 'XDA', 'XEA', 'XFA', 'XGA', 'XHA', 'XIA', 'XJA', 'XKA', 'XLA', 'XMA', 'XNA', 'XOA', 'XPA', 'XQA', 'XRA', 'XSA', 'XTA', 'XUA', 'XVA', 'XWA', 'XXA', 'XYA', 'XZA', 'XAB', 'XBB', 'XCB', 'XDB', 'XEB', 'XFB', 'XGB', 'XHB', 'XIB', 'XJB']
- >>> maps = FieldMappings()
- >>> [maps._genPrefix() for i in xrange(27*26 + 5)][-10:]
- ['XVZ', 'XWZ', 'XXZ', 'XYZ', 'XZZ', 'XAAA', 'XBAA', 'XCAA', 'XDAA', 'XEAA']
- """
- res = []
- self._prefixcount += 1
- num = self._prefixcount
- while num != 0:
- ch = (num - 1) % 26
- res.append(chr(ch + ord('A')))
- num -= ch
- num = num // 26
- return 'X' + ''.join(res)
-
- def get_prefix(self, fieldname):
- """Get the prefix used for a given field name.
-
- """
- return self._prefixes[fieldname]
-
- def get_slot(self, fieldname):
- """Get the slot number used for a given field name.
-
- """
- return self._slots[fieldname]
-
- def add_prefix(self, fieldname):
- """Allocate a prefix for the given field.
-
- If a prefix is already allocated for this field, this has no effect.
-
- """
- if fieldname in self._prefixes:
- return
- self._prefixes[fieldname] = self._genPrefix()
-
- def add_slot(self, fieldname):
- """Allocate a slot number for the given field.
-
- If a slot number is already allocated for this field, this has no effect.
-
- """
- if fieldname in self._slots:
- return
- self._slots[fieldname] = self._slotcount
- self._slotcount += 1
-
- def serialise(self):
- """Serialise the field mappings to a string.
-
- This can be unserialised by passing the result of this method to the
- constructor of a new FieldMappings object.
-
- """
- return _cPickle.dumps((self._prefixes,
- self._prefixcount,
- self._slots,
- self._slotcount,
- ), 2)
diff --git a/secore/highlight.py b/secore/highlight.py
deleted file mode 100644
index 38f2050..0000000
--- a/secore/highlight.py
+++ /dev/null
@@ -1,310 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""highlight.py: Highlight and summarise text.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import re
-import xapian
-
-class Highlighter(object):
- """Class for highlighting text and creating contextual summaries.
-
- >>> hl = Highlighter("en")
- >>> hl.makeSample('Hello world.', ['world'])
- 'Hello world.'
- >>> hl.highlight('Hello world', ['world'], ('<', '>'))
- 'Hello <world>'
-
- """
-
- # split string into words, spaces, punctuation and markup tags
- _split_re = re.compile(
- '</\\w+>|<\\w+(?:\\s*\\w+="[^"]*"|\\s*\\w+)*\\s*>|[\\w\']+|\\s+|[^\\w\'\\s<>/]+')
-
- def __init__(self, language_code='en', stemmer=None):
- """Create a new highlighter for the specified language.
-
- """
- if stemmer is not None:
- self.stem = stemmer
- else:
- self.stem = xapian.Stem(language_code)
-
- def _split_text(self, text, strip_tags=False):
- """Split some text into words and non-words.
-
- - `text` is the text to process. It may be a unicode object or a utf-8
- encoded simple string.
- - `strip_tags` is a flag - False to keep tags, True to strip all tags
- from the output.
-
- Returns a list of utf-8 encoded simple strings.
-
- """
- if isinstance(text, unicode):
- text = text.encode('utf-8')
-
- words = self._split_re.findall(text)
- if strip_tags:
- return [w for w in words if w[0] != '<']
- else:
- return words
-
- def _strip_prefix(self, term):
- """Strip the prefix off a term.
-
- Prefixes are any initial capital letters, with the exception that R always
- ends a prefix, even if followed by capital letters.
-
- >>> hl = Highlighter("en")
- >>> print hl._strip_prefix('hello')
- hello
- >>> print hl._strip_prefix('Rhello')
- hello
- >>> print hl._strip_prefix('XARHello')
- Hello
- >>> print hl._strip_prefix('XAhello')
- hello
- >>> print hl._strip_prefix('XAh')
- h
- >>> print hl._strip_prefix('XA')
- <BLANKLINE>
-
- """
- for p in xrange(len(term)):
- if term[p].islower():
- return term[p:]
- elif term[p] == 'R':
- return term[p+1:]
- return ''
-
- def _query_to_stemmed_words(self, query):
- """Convert a query to a list of stemmed words.
-
- - `query` is the query to parse: it may be xapian.Query object, or a
- sequence of terms.
-
- """
- if isinstance(query, xapian.Query):
- return [self._strip_prefix(t) for t in query]
- else:
- return [self.stem(q.lower()) for q in query]
-
-
- def makeSample(self, text, query, maxlen=600, hl=None):
- """Make a contextual summary from the supplied text.
-
- This basically works by splitting the text into phrases, counting the query
- terms in each, and keeping those with the most.
-
- Any markup tags in the text will be stripped.
-
- `text` is the source text to summarise.
- `query` is either a Xapian query object or a list of (unstemmed) term strings.
- `maxlen` is the maximum length of the generated summary.
- `hl` is a pair of strings to insert around highlighted terms, e.g. ('<b>', '</b>')
-
- """
-
- words = self._split_text(text, True)
- terms = self._query_to_stemmed_words(query)
-
- # build blocks delimited by puncuation, and count matching words in each block
- # blocks[n] is a block [firstword, endword, charcount, termcount, selected]
- blocks = []
- start = end = count = blockchars = 0
-
- while end < len(words):
- blockchars += len(words[end])
- if words[end].isalnum():
- if self.stem(words[end].lower()) in terms:
- count += 1
- end += 1
- elif words[end] in ',.;:?!\n':
- end += 1
- blocks.append([start, end, blockchars, count, False])
- start = end
- blockchars = 0
- count = 0
- else:
- end += 1
- if start != end:
- blocks.append([start, end, blockchars, count, False])
- if len(blocks) == 0:
- return ''
-
- # select high-scoring blocks first, down to zero-scoring
- chars = 0
- for count in xrange(3, -1, -1):
- for b in blocks:
- if b[3] >= count:
- b[4] = True
- chars += b[2]
- if chars >= maxlen: break
- if chars >= maxlen: break
-
- # assemble summary
- words2 = []
- lastblock = -1
- for i, b in enumerate(blocks):
- if b[4]:
- if i != lastblock + 1:
- words2.append('..')
- words2.extend(words[b[0]:b[1]])
- lastblock = i
-
- if not blocks[-1][4]:
- words2.append('..')
-
- # trim down to maxlen
- l = 0
- for i in xrange (len (words2)):
- l += len (words2[i])
- if l >= maxlen:
- words2[i:] = ['..']
- break
-
- if hl is None:
- return ''.join(words2)
- else:
- return self._hl(words2, terms, hl)
-
- def highlight(self, text, query, hl, strip_tags=False):
- """Add highlights (string prefix/postfix) to a string.
-
- `text` is the source to highlight.
- `query` is either a Xapian query object or a list of (unstemmed) term strings.
- `hl` is a pair of highlight strings, e.g. ('<i>', '</i>')
- `strip_tags` strips HTML markout iff True
-
- >>> hl = Highlighter()
- >>> qp = xapian.QueryParser()
- >>> q = qp.parse_query('cat dog')
- >>> tags = ('[[', ']]')
- >>> hl.highlight('The cat went Dogging; but was <i>dog tired</i>.', q, tags)
- 'The [[cat]] went [[Dogging]]; but was <i>[[dog]] tired</i>.'
-
- """
- words = self._split_text(text, strip_tags)
- terms = self._query_to_stemmed_words(query)
- return self._hl(words, terms, hl)
-
- def _hl(self, words, terms, hl):
- """Add highlights to a list of words.
-
- `words` is the list of words and non-words to be highlighted..
- `terms` is the list of stemmed words to look for.
-
- """
- for i, w in enumerate(words):
- if self.stem(words[i].lower()) in terms:
- words[i] = ''.join((hl[0], w, hl[1]))
-
- return ''.join(words)
-
-
-__test__ = {
- 'no_punc': r'''
-
- Test the highlighter's behaviour when there is no punctuation in the sample
- text (regression test - used to return no output):
- >>> hl = Highlighter("en")
- >>> hl.makeSample('Hello world', ['world'])
- 'Hello world'
-
- ''',
-
- 'stem_levels': r'''
-
- Test highlighting of words, and how it works with stemming:
- >>> hl = Highlighter("en")
-
- # "word" and "wording" stem to "word", so the following 4 calls all return
- # the same thing
- >>> hl.makeSample('Hello. word. wording. wordinging.', ['word'], hl='<>')
- 'Hello. <word>. <wording>. wordinging.'
- >>> hl.highlight('Hello. word. wording. wordinging.', ['word'], '<>')
- 'Hello. <word>. <wording>. wordinging.'
- >>> hl.makeSample('Hello. word. wording. wordinging.', ['wording'], hl='<>')
- 'Hello. <word>. <wording>. wordinging.'
- >>> hl.highlight('Hello. word. wording. wordinging.', ['wording'], '<>')
- 'Hello. <word>. <wording>. wordinging.'
-
- # "wordinging" stems to "wording", so only the last word is highlighted for
- # this one.
- >>> hl.makeSample('Hello. word. wording. wordinging.', ['wordinging'], hl='<>')
- 'Hello. word. wording. <wordinging>.'
- >>> hl.highlight('Hello. word. wording. wordinging.', ['wordinging'], '<>')
- 'Hello. word. wording. <wordinging>.'
- ''',
-
- 'supplied_stemmer': r'''
-
- Test behaviour if we pass in our own stemmer:
- >>> stem = xapian.Stem('en')
- >>> hl = Highlighter(stemmer=stem)
- >>> hl.highlight('Hello. word. wording. wordinging.', ['word'], '<>')
- 'Hello. <word>. <wording>. wordinging.'
-
- ''',
-
- 'unicode': r'''
-
- Test behaviour if we pass in unicode input:
- >>> hl = Highlighter('en')
- >>> hl.highlight(u'Hello\xf3. word. wording. wordinging.', ['word'], '<>')
- 'Hello\xc3\xb3. <word>. <wording>. wordinging.'
-
- ''',
-
- 'no_sample': r'''
-
- Test behaviour if we pass in unicode input:
- >>> hl = Highlighter('en')
- >>> hl.makeSample(u'', ['word'])
- ''
-
- ''',
-
- 'short_samples': r'''
-
- >>> hl = Highlighter('en')
- >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['hello'], 20, ('<', '>'))
- '.. <Hello> world ..'
- >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['hello'], 40, ('<', '>'))
- 'A boring start. <Hello> world indeed...'
- >>> hl.makeSample("A boring start. Hello world indeed. A boring end.", ['boring'], 40, ('<', '>'))
- 'A <boring> start... A <boring> end.'
-
- ''',
-
- 'apostrophes': r'''
-
- >>> hl = Highlighter('en')
- >>> hl.makeSample("A boring start. Hello world's indeed. A boring end.", ['world'], 40, ('<', '>'))
- "A boring start. Hello <world's> indeed..."
-
- ''',
-
-}
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod (sys.modules[__name__])
diff --git a/secore/indexerconnection.py b/secore/indexerconnection.py
deleted file mode 100644
index 84ea9a1..0000000
--- a/secore/indexerconnection.py
+++ /dev/null
@@ -1,382 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""indexerconnection.py: A connection to the search engine for indexing.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import xapian as _xapian
-from datastructures import *
-from fieldactions import *
-import fieldmappings as _fieldmappings
-import errors as _errors
-import os as _os
-import cPickle as _cPickle
-
-class IndexerConnection(object):
- """A connection to the search engine for indexing.
-
- """
-
- def __init__(self, indexpath):
- """Create a new connection to the index.
-
- There may only be one indexer connection for a particular database open
- at a given time. Therefore, if a connection to the database is already
- open, this will raise a xapian.DatabaseLockError.
-
- If the database doesn't already exist, it will be created.
-
- """
- self._index = _xapian.WritableDatabase(indexpath, _xapian.DB_CREATE_OR_OPEN)
- self._indexpath = indexpath
-
- # Read existing actions.
- self._field_actions = {}
- self._field_mappings = _fieldmappings.FieldMappings()
- self._next_docid = 0
- self._config_modified = False
- self._load_config()
-
- def _store_config(self):
- """Store the configuration for the database.
-
- Currently, this stores the configuration in a file in the database
- directory, so changes to it are not protected by transactions. When
- support is available in xapian for storing metadata associated with
- databases. this will be used instead of a file.
-
- """
- config_str = _cPickle.dumps((
- self._field_actions,
- self._field_mappings.serialise(),
- self._next_docid,
- ), 2)
- config_file = _os.path.join(self._indexpath, 'config')
- fd = open(config_file, "w")
- fd.write(config_str)
- fd.close()
- self._config_modified = False
-
- def _load_config(self):
- """Load the configuration for the database.
-
- """
- config_file = _os.path.join(self._indexpath, 'config')
- if not _os.path.exists(config_file):
- return
- fd = open(config_file)
- config_str = fd.read()
- if not config_str:
- return
- fd.close()
-
- (self._field_actions, mappings, self._next_docid) = _cPickle.loads(config_str)
- self._field_mappings = _fieldmappings.FieldMappings(mappings)
- self._config_modified = False
-
- def _allocate_id(self):
- """Allocate a new ID.
-
- """
- while True:
- idstr = "%x" % self._next_docid
- self._next_docid += 1
- if not self._index.term_exists('Q' + idstr):
- break
- self._config_modified = True
- return idstr
-
- def add_field_action(self, fieldname, fieldtype, **kwargs):
- """Add an action to be performed on a field.
-
- Note that this change to the configuration will not be preserved on
- disk until the next call to flush().
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- if fieldname in self._field_actions:
- actions = self._field_actions[fieldname]
- else:
- actions = FieldActions(fieldname)
- self._field_actions[fieldname] = actions
- actions.add(self._field_mappings, fieldtype, **kwargs)
- self._config_modified = True
-
- def clear_field_actions(self, fieldname):
- """Clear all actions for the specified field.
-
- This does not report an error if there are already no actions for the
- specified field.
-
- Note that this change to the configuration will not be preserved on
- disk until the next call to flush().
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- if fieldname in self._field_actions:
- del self._field_actions[fieldname]
- self._config_modified = True
-
- def process(self, document):
- """Process an UnprocessedDocument with the settings in this database.
-
- The resulting ProcessedDocument is returned.
-
- Note that this processing will be automatically performed if an
- UnprocessedDocument is supplied to the add() or replace() methods of
- IndexerConnection. This method is exposed to allow the processing to
- be performed separately, which may be desirable if you wish to manually
- modify the processed document before adding it to the database, or if
- you want to split processing of documents from adding documents to the
- database for performance reasons.
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- result = ProcessedDocument(self._field_mappings)
- result.id = document.id
- context = ActionContext(self._index)
-
- for field in document.fields:
- try:
- actions = self._field_actions[field.name]
- except KeyError:
- # If no actions are defined, just ignore the field.
- continue
- actions.perform(result, field.value, context)
-
- return result
-
- def add(self, document):
- """Add a new document to the search engine index.
-
- If the document has a id set, and the id already exists in
- the database, an exception will be raised. Use the replace() method
- instead if you wish to overwrite documents.
-
- Returns the id of the newly added document (making up a new
- unique ID if no id was set).
-
- The supplied document may be an instance of UnprocessedDocument, or an
- instance of ProcessedDocument.
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- if not hasattr(document, '_doc'):
- # It's not a processed document.
- document = self.process(document)
-
- # Ensure that we have a id
- orig_id = document.id
- if orig_id is None:
- id = self._allocate_id()
- document.id = id
- else:
- id = orig_id
- if self._index.term_exists('Q' + id):
- raise _errors.IndexerError("Document ID of document supplied to add() is not unique.")
-
- # Add the document.
- xapdoc = document.prepare()
- self._index.add_document(xapdoc)
-
- if id is not orig_id:
- document.id = orig_id
- return id
-
- def replace(self, document):
- """Replace a document in the search engine index.
-
- If the document does not have a id set, an exception will be
- raised.
-
- If the document has a id set, and the id does not already
- exist in the database, this method will have the same effect as add().
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- if not hasattr(document, '_doc'):
- # It's not a processed document.
- document = self.process(document)
-
- # Ensure that we have a id
- id = document.id
- if id is None:
- raise _errors.IndexerError("No document ID set for document supplied to replace().")
-
- xapdoc = document.prepare()
- self._index.replace_document('Q' + id, xapdoc)
-
- def delete(self, id):
- """Delete a document from the search engine index.
-
- If the id does not already exist in the database, this method
- will have no effect (and will not report an error).
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- self._index.delete_document('Q' + id)
-
- def flush(self):
- """Apply recent changes to the database.
-
- If an exception occurs, any changes since the last call to flush() may
- be lost.
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- if self._config_modified:
- self._store_config()
- self._index.flush()
-
- def close(self):
- """Close the connection to the database.
-
- It is important to call this method before allowing the class to be
- garbage collected, because it will ensure that any un-flushed changes
- will be flushed. It also ensures that the connection is cleaned up
- promptly.
-
- No other methods may be called on the connection after this has been
- called. (It is permissible to call close() multiple times, but
- only the first call will have any effect.)
-
- If an exception occurs, the database will be closed, but changes since
- the last call to flush may be lost.
-
- """
- if self._index is None:
- return
- try:
- self.flush()
- finally:
- # There is currently no "close()" method for xapian databases, so
- # we have to rely on the garbage collector. Since we never copy
- # the _index property out of this class, there should be no cycles,
- # so the standard python implementation should garbage collect
- # _index straight away. A close() method is planned to be added to
- # xapian at some point - when it is, we should call it here to make
- # the code more robust.
- self._index = None
- self._indexpath = None
- self._field_actions = None
- self._config_modified = False
-
- def get_doccount(self):
- """Count the number of documents in the database.
-
- This count will include documents which have been added or removed but
- not yet flushed().
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- return self._index.get_doccount()
-
- def iterids(self):
- """Get an iterator which returns all the ids in the database.
-
- The unqiue_ids are currently returned in binary lexicographical sort
- order, but this should not be relied on.
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- return PrefixedTermIter('Q', self._index.allterms())
-
- def get_document(self, id):
- """Get the document with the specified unique ID.
-
- Raises a KeyError if there is no such document. Otherwise, it returns
- a ProcessedDocument.
-
- """
- if self._index is None:
- raise _errors.IndexerError("IndexerConnection has been closed")
- postlist = self._index.postlist('Q' + id)
- try:
- plitem = postlist.next()
- except StopIteration:
- # Unique ID not found
- raise KeyError('Unique ID %r not found' % id)
- try:
- postlist.next()
- raise _errors.IndexerError("Multiple documents " #pragma: no cover
- "found with same unique ID")
- except StopIteration:
- # Only one instance of the unique ID found, as it should be.
- pass
-
- result = ProcessedDocument(self._field_mappings)
- result.id = id
- result._doc = self._index.get_document(plitem.docid)
- return result
-
-class PrefixedTermIter(object):
- """Iterate through all the terms with a given prefix.
-
- """
- def __init__(self, prefix, termiter):
- """Initialise the prefixed term iterator.
-
- - `prefix` is the prefix to return terms for.
- - `termiter` is a xapian TermIterator, which should be at it's start.
-
- """
-
- # The algorithm used in next() currently only works for single
- # character prefixes, so assert that the prefix is single character.
- # To deal with multicharacter prefixes, we need to check for terms
- # which have a starting prefix equal to that given, but then have a
- # following uppercase alphabetic character, indicating that the actual
- # prefix is longer than the target prefix. We then need to skip over
- # these. Not too hard to implement, but we don't need it yet.
- assert(len(prefix) == 1)
-
- self._started = False
- self._prefix = prefix
- self._prefixlen = len(prefix)
- self._termiter = termiter
-
- def __iter__(self):
- return self
-
- def next(self):
- """Get the next term with the specified prefix.
-
-
- """
- if not self._started:
- term = self._termiter.skip_to(self._prefix).term
- self._started = True
- else:
- term = self._termiter.next().term
- if len(term) < self._prefixlen or term[:self._prefixlen] != self._prefix:
- raise StopIteration
- return term[self._prefixlen:]
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod (sys.modules[__name__])
diff --git a/secore/marshall.py b/secore/marshall.py
deleted file mode 100644
index ebcc71d..0000000
--- a/secore/marshall.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""marshall.py: Marshal values into strings
-
-"""
-__docformat__ = "restructuredtext en"
-
-import math
-
-def _long_to_base256_array(value, length, flip):
- result = []
- for i in xrange(length):
- n = value % 256
- if flip: n = 255 - n
- result.insert(0, chr(n))
- value /= 256
- return result
-
-def float_to_string(value):
- """Marshall a floating point number to a string which sorts in the
- appropriate manner.
-
- """
- mantissa, exponent = math.frexp(value)
- sign = '1'
- if mantissa < 0:
- mantissa = -mantissa
- sign = '0'
-
- # IEEE representation of doubles uses 11 bits for the exponent, with a bias
- # of 1023. There's then another 52 bits in the mantissa, so we need to
- # add 1075 to be sure that the exponent won't be negative.
- # Even then, we check that the exponent isn't negative, and consider the
- # value to be equal to zero if it is.
- exponent += 1075
- if exponent < 0: # Note - this can't happen on most architectures #pragma: no cover
- exponent = 0
- mantissa = 0
- elif mantissa == 0:
- exponent = 0
-
- # IEEE representation of doubles uses 52 bits for the mantissa. Convert it
- # to a 7 character string, and convert the exponent to a 2 character
- # string.
-
- mantissa = long(mantissa * (2**52))
-
- digits = [sign]
- digits.extend(_long_to_base256_array(exponent, 2, sign == '0'))
- digits.extend(_long_to_base256_array(mantissa, 7, sign == '0'))
-
- return ''.join(digits)
-
-def date_to_string(date):
- """Marshall a date to a string which sorts in the appropriate manner.
-
- """
- return '%04d%02d%02d' % (date.year, date.month, date.day)
diff --git a/secore/parsedate.py b/secore/parsedate.py
deleted file mode 100644
index 684d5f2..0000000
--- a/secore/parsedate.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""parsedate.py: Parse date strings.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import datetime
-import re
-
-yyyymmdd_re = re.compile(r'(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})$')
-yyyy_mm_dd_re = re.compile(r'(?P<year>[0-9]{4})([-/.])(?P<month>[0-9]{2})\2(?P<day>[0-9]{2})$')
-
-def date_from_string(value):
- """Parse a string into a date.
-
- If the value supplied is already a date-like object (ie, has 'year',
- 'month' and 'day' attributes), it is returned without processing.
-
- Supported date formats are:
-
- - YYYYMMDD
- - YYYY-MM-DD
- - YYYY/MM/DD
- - YYYY.MM.DD
-
- """
- if (hasattr(value, 'year')
- and hasattr(value, 'month')
- and hasattr(value, 'day')):
- return value
-
- mg = yyyymmdd_re.match(value)
- if mg is None:
- mg = yyyy_mm_dd_re.match(value)
-
- if mg is not None:
- year, month, day = (int(i) for i in mg.group('year', 'month', 'day'))
- return datetime.date(year, month, day)
-
- raise ValueError('Unrecognised date format')
diff --git a/secore/searchconnection.py b/secore/searchconnection.py
deleted file mode 100644
index 79fa509..0000000
--- a/secore/searchconnection.py
+++ /dev/null
@@ -1,618 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2007 Lemur Consulting Ltd
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-r"""searchconnection.py: A connection to the search engine for searching.
-
-"""
-__docformat__ = "restructuredtext en"
-
-import xapian as _xapian
-from datastructures import *
-from fieldactions import *
-import fieldmappings as _fieldmappings
-import highlight as _highlight
-import errors as _errors
-import os as _os
-import cPickle as _cPickle
-
-class SearchResult(ProcessedDocument):
- """A result from a search.
-
- """
- def __init__(self, msetitem, results):
- ProcessedDocument.__init__(self, results._fieldmappings, msetitem.document)
- self.rank = msetitem.rank
- self._results = results
-
- def _get_language(self, field):
- """Get the language that should be used for a given field.
-
- """
- actions = self._results._conn._field_actions[field]._actions
- for action, kwargslist in actions.iteritems():
- if action == FieldActions.INDEX_FREETEXT:
- for kwargs in kwargslist:
- try:
- return kwargs['language']
- except KeyError:
- pass
- return 'none'
-
- def summarise(self, field, maxlen=600, hl=('<b>', '</b>')):
- """Return a summarised version of the field specified.
-
- This will return a summary of the contents of the field stored in the
- search result, with words which match the query highlighted.
-
- The maximum length of the summary (in characters) may be set using the
- maxlen parameter.
-
- The return value will be a string holding the summary, with
- highlighting applied. If there are multiple instances of the field in
- the document, the instances will be joined with a newline character.
-
- To turn off highlighting, set hl to None. Each highlight will consist
- of the first entry in the `hl` list being placed before the word, and
- the second entry in the `hl` list being placed after the word.
-
- Any XML or HTML style markup tags in the field will be stripped before
- the summarisation algorithm is applied.
-
- """
- highlighter = _highlight.Highlighter(language_code=self._get_language(field))
- field = self.data[field]
- results = []
- text = '\n'.join(field)
- return highlighter.makeSample(text, self._results._query, maxlen, hl)
-
- def highlight(self, field, hl=('<b>', '</b>'), strip_tags=False):
- """Return a highlighted version of the field specified.
-
- This will return all the contents of the field stored in the search
- result, with words which match the query highlighted.
-
- The return value will be a list of strings (corresponding to the list
- of strings which is the raw field data).
-
- Each highlight will consist of the first entry in the `hl` list being
- placed before the word, and the second entry in the `hl` list being
- placed after the word.
-
- If `strip_tags` is True, any XML or HTML style markup tags in the field
- will be stripped before highlighting is applied.
-
- """
- highlighter = _highlight.Highlighter(language_code=self._get_language(field))
- field = self.data[field]
- results = []
- for text in field:
- results.append(highlighter.highlight(text, self._results._query, hl, strip_tags))
- return results
-
- def __repr__(self):
- return ('<SearchResult(rank=%d, id=%r, data=%r)>' %
- (self.rank, self.id, self.data))
-
-
-class SearchResultIter(object):
- """An iterator over a set of results from a search.
-
- """
- def __init__(self, results):
- self._results = results
- self._iter = iter(results._mset)
-
- def next(self):
- msetitem = self._iter.next()
- return SearchResult(msetitem,
- self._results)
-
-
-class SearchResults(object):
- """A set of results of a search.
-
- """
- def __init__(self, conn, enq, query, mset, fieldmappings):
- self._conn = conn
- self._enq = enq
- self._query = query
- self._mset = mset
- self._fieldmappings = fieldmappings
-
- def __repr__(self):
- return ("<SearchResults(startrank=%d, "
- "endrank=%d, "
- "more_matches=%s, "
- "matches_lower_bound=%d, "
- "matches_upper_bound=%d, "
- "matches_estimated=%d, "
- "estimate_is_exact=%s)>" %
- (
- self.startrank,
- self.endrank,
- self.more_matches,
- self.matches_lower_bound,
- self.matches_upper_bound,
- self.matches_estimated,
- self.estimate_is_exact,
- ))
-
- def _get_more_matches(self):
- # This check relies on us having asked for at least one more result
- # than retrieved to be checked.
- return (self.matches_lower_bound > self.endrank)
- more_matches = property(_get_more_matches, doc=
- """Check whether there are further matches after those in this result set.
-
- """)
- def _get_startrank(self):
- return self._mset.get_firstitem()
- startrank = property(_get_startrank, doc=
- """Get the rank of the first item in the search results.
-
- This corresponds to the "startrank" parameter passed to the search() method.
-
- """)
- def _get_endrank(self):
- return self._mset.get_firstitem() + len(self._mset)
- endrank = property(_get_endrank, doc=
- """Get the rank of the item after the end of the search results.
-
- If there are sufficient results in the index, this corresponds to the
- "endrank" parameter passed to the search() method.
-
- """)
- def _get_lower_bound(self):
- return self._mset.get_matches_lower_bound()
- matches_lower_bound = property(_get_lower_bound, doc=
- """Get a lower bound on the total number of matching documents.
-
- """)
- def _get_upper_bound(self):
- return self._mset.get_matches_upper_bound()
- matches_upper_bound = property(_get_upper_bound, doc=
- """Get an upper bound on the total number of matching documents.
-
- """)
- def _get_estimated(self):
- return self._mset.get_matches_estimated()
- matches_estimated = property(_get_estimated, doc=
- """Get an estimate for the total number of matching documents.
-
- """)
- def _estimate_is_exact(self):
- return self._mset.get_matches_lower_bound() == \
- self._mset.get_matches_upper_bound()
- estimate_is_exact = property(_estimate_is_exact, doc=
- """Check whether the estimated number of matching documents is exact.
-
- If this returns true, the estimate given by the `matches_estimated`
- property is guaranteed to be correct.
-
- If this returns false, it is possible that the actual number of matching
- documents is different from the number given by the `matches_estimated`
- property.
-
- """)
-
- def get_hit(self, index):
- """Get the hit with a given index.
-
- """
- msetitem = self._mset.get_hit(index)
- return SearchResult(msetitem, self)
- __getitem__ = get_hit
-
- def __iter__(self):
- """Get an iterator over the hits in the search result.
-
- The iterator returns the results in increasing order of rank.
-
- """
- return SearchResultIter(self)
-
-class SearchConnection(object):
- """A connection to the search engine for searching.
-
- The connection will access a view of the database.
-
- """
-
- def __init__(self, indexpath):
- """Create a new connection to the index for searching.
-
- There may only an arbitrary number of search connections for a
- particular database open at a given time (regardless of whether there
- is a connection for indexing open as well).
-
- If the database doesn't exist, an exception will be raised.
-
- """
- self._index = _xapian.Database(indexpath)
- self._indexpath = indexpath
-
- # Read the actions.
- self._load_config()
-
- def _get_sort_type(self, field):
- """Get the sort type that should be used for a given field.
-
- """
- actions = self._field_actions[field]._actions
- for action, kwargslist in actions.iteritems():
- if action == FieldActions.SORT_AND_COLLAPSE:
- for kwargs in kwargslist:
- return kwargs['type']
-
- def _load_config(self):
- """Load the configuration for the database.
-
- """
- # Note: this code is basically duplicated in the IndexerConnection
- # class. Move it to a shared location.
- config_file = _os.path.join(self._indexpath, 'config')
- if not _os.path.exists(config_file):
- self._field_mappings = _fieldmappings.FieldMappings()
- return
- fd = open(config_file)
- config_str = fd.read()
- fd.close()
-
- (self._field_actions, mappings, next_docid) = _cPickle.loads(config_str)
- self._field_mappings = _fieldmappings.FieldMappings(mappings)
-
- def reopen(self):
- """Reopen the connection.
-
- This updates the revision of the index which the connection references
- to the latest flushed revision.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- self._index.reopen()
- # Re-read the actions.
- self._load_config()
-
- def close(self):
- """Close the connection to the database.
-
- It is important to call this method before allowing the class to be
- garbage collected to ensure that the connection is cleaned up promptly.
-
- No other methods may be called on the connection after this has been
- called. (It is permissible to call close() multiple times, but
- only the first call will have any effect.)
-
- If an exception occurs, the database will be closed, but changes since
- the last call to flush may be lost.
-
- """
- if self._index is None:
- return
- # There is currently no "close()" method for xapian databases, so
- # we have to rely on the garbage collector. Since we never copy
- # the _index property out of this class, there should be no cycles,
- # so the standard python implementation should garbage collect
- # _index straight away. A close() method is planned to be added to
- # xapian at some point - when it is, we should call it here to make
- # the code more robust.
- self._index = None
- self._indexpath = None
- self._field_actions = None
- self._field_mappings = None
-
- def get_doccount(self):
- """Count the number of documents in the database.
-
- This count will include documents which have been added or removed but
- not yet flushed().
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- return self._index.get_doccount()
-
- def get_document(self, id):
- """Get the document with the specified unique ID.
-
- Raises a KeyError if there is no such document. Otherwise, it returns
- a ProcessedDocument.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- postlist = self._index.postlist('Q' + id)
- try:
- plitem = postlist.next()
- except StopIteration:
- # Unique ID not found
- raise KeyError('Unique ID %r not found' % id)
- try:
- postlist.next()
- raise _errors.SearchError("Multiple documents " #pragma: no cover
- "found with same unique ID")
- except StopIteration:
- # Only one instance of the unique ID found, as it should be.
- pass
-
- result = ProcessedDocument(self._field_mappings)
- result.id = id
- result._doc = self._index.get_document(plitem.docid)
- return result
-
- OP_AND = _xapian.Query.OP_AND
- OP_OR = _xapian.Query.OP_OR
- def query_composite(self, operator, queries):
- """Build a composite query from a list of queries.
-
- The queries are combined with the supplied operator, which is either
- SearchConnection.OP_AND or SearchConnection.OP_OR.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- return _xapian.Query(operator, list(queries))
-
- def query_filter(self, query, filter):
- """Filter a query with another query.
-
- Documents will only match the resulting query if they match both
- queries, but will be weighted according to only the first query.
-
- - `query`: The query to filter.
- - `filter`: The filter to apply to the query.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- if not isinstance(filter, _xapian.Query):
- raise _errors.SearchError("Filter must be a Xapian Query object")
- return _xapian.Query(_xapian.Query.OP_FILTER, query, filter)
-
- def query_range(self, field, begin, end):
- """Create a query for a range search.
-
- This creates a query which matches only those documents which have a
- field value in the specified range.
-
- Begin and end must be appropriate values for the field, according to
- the 'type' parameter supplied to the SORTABLE action for the field.
-
- The begin and end values are both inclusive - any documents with a
- value equal to begin or end will be returned (unless end is less than
- begin, in which case no documents will be returned).
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
-
- sorttype = self._get_sort_type(field)
- marshaller = SortableMarshaller(False)
- fn = marshaller.get_marshall_function(field, sorttype)
- begin = fn(field, begin)
- end = fn(field, end)
-
- slot = self._field_mappings.get_slot(field)
- return _xapian.Query(_xapian.Query.OP_VALUE_RANGE, slot, begin, end)
-
- def _prepare_queryparser(self, allow, deny, default_op):
- """Prepare (and return) a query parser using the specified fields and
- operator.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- if allow is not None and deny is not None:
- raise _errors.SearchError("Cannot specify both `allow` and `deny`")
- qp = _xapian.QueryParser()
- qp.set_database(self._index)
- qp.set_default_op(default_op)
-
- if allow is None:
- allow = [key for key in self._field_actions]
- if deny is not None:
- allow = [key for key in allow if key not in deny]
-
- for field in allow:
- actions = self._field_actions[field]._actions
- for action, kwargslist in actions.iteritems():
- if action == FieldActions.INDEX_EXACT:
- # FIXME - need patched version of xapian to add exact prefixes
- #qp.add_exact_prefix(field, self._field_mappings.get_prefix(field))
- qp.add_prefix(field, self._field_mappings.get_prefix(field))
- if action == FieldActions.INDEX_FREETEXT:
- qp.add_prefix(field, self._field_mappings.get_prefix(field))
- for kwargs in kwargslist:
- try:
- lang = kwargs['language']
- qp.set_stemmer(_xapian.Stem(lang))
- qp.set_stemming_strategy(qp.STEM_SOME)
- except KeyError:
- pass
- return qp
-
- def query_parse(self, string, allow=None, deny=None, default_op=OP_AND):
- """Parse a query string.
-
- This is intended for parsing queries entered by a user. If you wish to
- combine structured queries, it is generally better to use the other
- query building methods, such as `query_composite`.
-
- - `string`: The string to parse.
- - `allow`: A list of fields to allow in the query.
- - `deny`: A list of fields not to allow in the query.
-
- Only one of `allow` and `deny` may be specified.
-
- If any of the entries in `allow` or `deny` are not present in the
- configuration for the database, an exception will be raised.
-
- Returns a Query object, which may be passed to the search() method, or
- combined with other queries.
-
- """
- qp = self._prepare_queryparser(allow, deny, default_op)
- try:
- return qp.parse_query(string)
- except _xapian.QueryParserError, e:
- # If we got a parse error, retry without boolean operators (since
- # these are the usual cause of the parse error).
- return qp.parse_query(string, 0)
-
- def query_field(self, field, value, default_op=OP_AND):
- """A query for a single field.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- try:
- actions = self._field_actions[field]._actions
- except KeyError:
- actions = {}
-
- # need to check on field type, and stem / split as appropriate
- for action, kwargslist in actions.iteritems():
- if action == FieldActions.INDEX_EXACT:
- prefix = self._field_mappings.get_prefix(field)
- if len(value) > 0:
- chval = ord(value[0])
- if chval >= ord('A') and chval <= ord('Z'):
- prefix = prefix + ':'
- return _xapian.Query(prefix + value)
- if action == FieldActions.INDEX_FREETEXT:
- qp = _xapian.QueryParser()
- qp.set_default_op(default_op)
- prefix = self._field_mappings.get_prefix(field)
- for kwargs in kwargslist:
- try:
- lang = kwargs['language']
- qp.set_stemmer(_xapian.Stem(lang))
- qp.set_stemming_strategy(qp.STEM_SOME)
- except KeyError:
- pass
- return qp.parse_query(value,
- qp.FLAG_PHRASE | qp.FLAG_BOOLEAN | qp.FLAG_LOVEHATE,
- prefix)
-
- return _xapian.Query()
-
- def query_all(self):
- """A query which matches all the documents in the database.
-
- """
- return _xapian.Query('')
-
- def spell_correct(self, string, allow=None, deny=None):
- """Correct a query spelling.
-
- This returns a version of the query string with any misspelt words
- corrected.
-
- - `allow`: A list of fields to allow in the query.
- - `deny`: A list of fields not to allow in the query.
-
- Only one of `allow` and `deny` may be specified.
-
- If any of the entries in `allow` or `deny` are not present in the
- configuration for the database, an exception will be raised.
-
- """
- qp = self._prepare_queryparser(allow, deny, self.OP_AND)
- qp.parse_query(string, qp.FLAG_PHRASE|qp.FLAG_BOOLEAN|qp.FLAG_LOVEHATE|qp.FLAG_SPELLING_CORRECTION)
- corrected = qp.get_corrected_query_string()
- if len(corrected) == 0:
- if isinstance(string, unicode):
- # Encode as UTF-8 for consistency - this happens automatically
- # to values passed to Xapian.
- return string.encode('utf-8')
- return string
- return corrected
-
- def search(self, query, startrank, endrank,
- checkatleast=0, sortby=None, collapse=None):
- """Perform a search, for documents matching a query.
-
- - `query` is the query to perform.
- - `startrank` is the rank of the start of the range of matching
- documents to return (ie, the result with this rank will be returned).
- ranks start at 0, which represents the "best" matching document.
- - `endrank` is the rank at the end of the range of matching documents
- to return. This is exclusive, so the result with this rank will not
- be returned.
- - `checkatleast` is the minimum number of results to check for: the
- estimate of the total number of matches will always be exact if
- the number of matches is less than `checkatleast`.
- - `sortby` is the name of a field to sort by. It may be preceded by a
- '+' or a '-' to indicate ascending or descending order
- (respectively). If the first character is neither '+' or '-', the
- sort will be in ascending order.
- - `collapse` is the name of a field to collapse the result documents
- on. If this is specified, there will be at most one result in the
- result set for each value of the field.
-
- """
- if self._index is None:
- raise _errors.SearchError("SearchConnection has been closed")
- enq = _xapian.Enquire(self._index)
- enq.set_query(query)
-
- if sortby is not None:
- asc = True
- if sortby[0] == '-':
- asc = False
- sortby = sortby[1:]
- elif sortby[0] == '+':
- sortby = sortby[1:]
-
- try:
- slotnum = self._field_mappings.get_slot(sortby)
- except KeyError:
- raise _errors.SearchError("Field %r was not indexed for sorting" % sortby)
-
- # Note: we invert the "asc" parameter, because xapian treats
- # "ascending" as meaning "higher values are better"; in other
- # words, it considers "ascending" to mean return results in
- # descending order.
- enq.set_sort_by_value_then_relevance(slotnum, not asc)
-
- if collapse is not None:
- try:
- slotnum = self._field_mappings.get_slot(collapse)
- except KeyError:
- raise _errors.SearchError("Field %r was not indexed for collapsing" % collapse)
- enq.set_collapse_key(slotnum)
-
- maxitems = max(endrank - startrank, 0)
- # Always check for at least one more result, so we can report whether
- # there are more matches.
- checkatleast = max(checkatleast, endrank + 1)
-
- enq.set_docid_order(enq.DONT_CARE)
-
- # Repeat the search until we don't get a DatabaseModifiedError
- while True:
- try:
- mset = enq.get_mset(startrank, maxitems, checkatleast)
- break
- except _xapian.DatabaseModifiedError, e:
- self.reopen()
- return SearchResults(self, enq, query, mset, self._field_mappings)
-
-if __name__ == '__main__':
- import doctest, sys
- doctest.testmod (sys.modules[__name__])
diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am
index 480a5a7..062e9b0 100644
--- a/src/olpc/datastore/Makefile.am
+++ b/src/olpc/datastore/Makefile.am
@@ -1,11 +1,23 @@
datastoredir = $(pythondir)/olpc/datastore
datastore_PYTHON = \
__init__.py \
- backingstore.py \
- bin_copy.py \
- converter.py \
datastore.py \
- model.py \
- xapianindex.py \
- utils.py \
- __version__.py
+ filestore.py \
+ indexstore.py \
+ layoutmanager.py \
+ metadatastore.py \
+ migration.py \
+ optimizer.py
+
+AM_CPPFLAGS = \
+ $(WARN_CFLAGS) \
+ $(EXT_CFLAGS) \
+ $(PYTHON_INCLUDES)
+
+AM_LDFLAGS = -module -avoid-version
+
+pkgpyexecdir = $(pythondir)/olpc/datastore
+pkgpyexec_LTLIBRARIES = metadatareader.la
+
+metadatareader_la_SOURCES = \
+ metadatareader.c
diff --git a/src/olpc/datastore/__init__.py b/src/olpc/datastore/__init__.py
index fd38d75..8b13789 100644
--- a/src/olpc/datastore/__init__.py
+++ b/src/olpc/datastore/__init__.py
@@ -1,5 +1 @@
-# datastore package
-from olpc.datastore.datastore import DataStore, DS_LOG_CHANNEL
-
-
diff --git a/src/olpc/datastore/__version__.py b/src/olpc/datastore/__version__.py
deleted file mode 100644
index 98d1028..0000000
--- a/src/olpc/datastore/__version__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-##
-## src/olpc/datastore/__version__.py -- Version Information for unknown (syntax: Python)
-## [automatically generated and maintained by GNU shtool]
-##
-
-class version:
- v_hex = 0x002200
- v_short = "0.2.0"
- v_long = "0.2.0 (08-May-2007)"
- v_tex = "This is unknown, Version 0.2.0 (08-May-2007)"
- v_gnu = "unknown 0.2.0 (08-May-2007)"
- v_web = "unknown/0.2.0"
- v_sccs = "@(#)unknown 0.2.0 (08-May-2007)"
- v_rcs = "$Id: unknown 0.2.0 (08-May-2007) $"
-
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
deleted file mode 100644
index cd23680..0000000
--- a/src/olpc/datastore/backingstore.py
+++ /dev/null
@@ -1,987 +0,0 @@
-"""
-olpc.datastore.backingstore
-~~~~~~~~~~~~~~~~~~~~~~~~~~~
-management of stable storage for the datastore
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-import cPickle as pickle
-from datetime import datetime
-import gnomevfs
-import os
-import re
-import sha
-import subprocess
-import time
-import threading
-import errno
-import shutil
-import urllib
-import traceback
-import sys
-
-import dbus
-import xapian
-import gobject
-
-try:
- import cjson
- has_cjson = True
-except ImportError:
- import simplejson
- has_cjson = False
-
-from olpc.datastore.xapianindex import IndexManager
-from olpc.datastore import bin_copy
-from olpc.datastore import utils
-from olpc.datastore import model
-
-# changing this pattern impacts _targetFile
-filename_attempt_pattern = re.compile('\(\d+\)$')
-
-import logging
-DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
-logger = logging.getLogger(DS_LOG_CHANNEL)
-#logger.setLevel(logging.DEBUG)
-
-class BackingStore(object):
- """Backing stores manage stable storage. We abstract out the
- management of file/blob storage through this class, as well as the
- location of the backing store itself (it may be done via a network
- connection for example).
-
- While the backingstore is responsible for implementing the
- metadata interface no implementation is provided here. It is
- assumed by that interface that all the features enumerated in
- olpc.datastore.model are provided.
-
- """
- def __init__(self, uri, **kwargs):
- """The kwargs are used to configure the backend so it can
- provide its interface. See specific backends for details
- """
- pass
-
- def __repr__(self):
- return "<%s %s: %s %s>" % (self.__class__.__name__, self.id,
- self.title, self.uri)
- # Init phases
- @staticmethod
- def parse(uri):
- """parse the uri into an actionable mount-point.
- Returns True or False indicating if this backend handles a
- given uri.
- """
- return False
-
- def initialize_and_load(self):
- """phase to check the state of the located mount point, this
- method returns True (mount point is valid) or False (invalid
- or uninitialized mount point).
-
- self.check() which must return a boolean should check if the
- result of self.locate() is already a datastore and then
- initialize/load it according to self.options.
-
- When True self.load() is invoked.
- When False self.create() followed by self.load() is invoked.
- """
- if self.check() is False:
- self.initialize()
- self.load()
-
- def check(self):
- return False
-
- def load(self):
- """load the index for a given mount-point, then initialize its
- fulltext subsystem. This is the routine that will bootstrap
- the indexmanager (though create() may have just created it)
- """
- pass
-
- def initialize(self):
- """Initialize a new mount point"""
- pass
-
- # Informational
- def descriptor(self):
- """return a dict with atleast the following keys
- 'id' -- the id used to refer explicitly to the mount point
- 'title' -- Human readable identifier for the mountpoint
- 'uri' -- The uri which triggered the mount
- """
- pass
-
- @property
- def id(self): return self.descriptor()['id']
- @property
- def title(self): return self.descriptor()['title']
-
- # Storage Translation
- def localizedName(self, uid=None, content=None, target=None):
- """taking any of uid, a content object, or a direct target
- filename (which includes all of the relative components under a
- store). Return the localized filename that should be used _within_
- the repository for the storage of this content object
- """
- pass
-
-import time
-class AsyncCopy:
- CHUNK_SIZE=65536
-
- def __init__(self, src, dest, completion):
- self.src = src
- self.dest = dest
- self.completion = completion
- self.src_fp = -1
- self.dest_fp = -1
- self.written = 0
- self.size = 0
-
- def _cleanup(self):
- os.close(self.src_fp)
- os.close(self.dest_fp)
-
- def _copy_block(self, user_data=None):
- try:
- data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
- count = os.write(self.dest_fp, data)
- self.written += len(data)
-
- # error writing data to file?
- if count < len(data):
- logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
- self._cleanup()
- self.completion(RuntimeError("Error writing data to destination file"))
- return False
-
- # FIXME: emit progress here
-
- # done?
- if len(data) < AsyncCopy.CHUNK_SIZE:
- logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
- self._cleanup()
- self.completion(None, self.dest)
- return False
- except Exception, err:
- logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
- self._cleanup()
- self.completion(err)
- return False
-
- return True
-
- def start(self):
- self.src_fp = os.open(self.src, os.O_RDONLY)
- self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
-
- stat = os.fstat(self.src_fp)
- self.size = stat[6]
-
- logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
-
- self.tstart = time.time()
- sid = gobject.idle_add(self._copy_block)
-
-class FileBackingStore(BackingStore):
- """ A backing store that directs maps the storage of content
- objects to an available filesystem.
-
-
- # not really true, the change would result in the older version
- having the last content and the new version as well. The old one
- is left in the newest start rather than start state. if that can work...
- The semantics of interacting
- with the repositories mean that the current copy _can_ be edited
- in place. Its the actions create/update that create new revisions
- in the datastore and hence new versions.
- """
- STORE_NAME = "store"
- INDEX_NAME = "index"
- DESCRIPTOR_NAME = "metainfo"
-
- def __init__(self, uri, **kwargs):
- """ FileSystemStore(path=<root of managed storage>)
- """
- self.options = kwargs
- self.local_indexmanager = self.options.get('local_indexmanager', True)
-
- self.uri = uri
- self.base = os.path.join(uri, self.STORE_NAME)
- self.indexmanager = None
-
- """ Current uid of the user that is calling DataStore.get_filename
- through dbus. Needed for security stuff. It is an instance variable
- instead of a method parameter because this is less invasive for Update 1.
- """
- self.current_user_id = None
-
- # source for an idle callback that exports to the file system the
- # metadata from the index
- self._export_metadata_source = None
-
- # Informational
- def descriptor(self):
- """return a dict with atleast the following keys
- 'id' -- the id used to refer explicitly to the mount point
- 'title' -- Human readable identifier for the mountpoint
- 'uri' -- The uri which triggered the mount
- """
- # a hidden file with a pickled dict will live in the base
- # directory for each storage
- fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
- desc = None
- if os.path.exists(fn):
- try:
- fp = open(fn, 'r')
- desc = pickle.load(fp)
- fp.close()
- except:
- desc = None
- if not desc:
- # the data isn't there, this could happen for a number of
- # reasons (the store isn't writeable)
- # or if the information on it was corrupt
- # in this case, just create a new one
- desc = {'id' : self.uri,
- 'uri' : self.uri,
- 'title' : self.uri
- }
- self.create_descriptor(**desc)
-
- return desc
-
-
- def create_descriptor(self, **kwargs):
- # create the information descriptor for this store
- # defaults will be created if need be
- # passing limited values will leave existing keys in place
- kwargs = utils._convert(kwargs)
- fn = os.path.join(self.base, self.DESCRIPTOR_NAME)
- desc = {}
- if os.path.exists(fn):
- fp = open(fn, 'r')
- try:
- desc = pickle.load(fp)
- except:
- desc = {}
- finally:
- fp.close()
-
- desc.update(kwargs)
-
- if 'id' not in desc: desc['id'] = utils.create_uid()
- if 'uri' not in desc: desc['uri'] = self.uri
- if 'title' not in desc: desc['title'] = self.uri
-
- # TODO: Would be better to check if the device is present and
- # don't try to update the descriptor file if it's not.
- try:
- fp = open(fn, 'w')
- pickle.dump(desc, fp)
- fp.close()
- except IOError, e:
- logging.error('Unable to write descriptor:\n' + \
- ''.join(traceback.format_exception(*sys.exc_info())))
-
- @staticmethod
- def parse(uri):
- return os.path.isabs(uri) or os.path.isdir(uri)
-
- def check(self):
- if not os.path.exists(self.uri): return False
- if not os.path.exists(self.base): return False
- return True
-
- def initialize(self):
- if not os.path.exists(self.base):
- os.makedirs(self.base)
-
- # examine options and see what the indexmanager plan is
- if self.local_indexmanager:
- # create a local storage using the indexmanager
- # otherwise we will connect the global manager
- # in load
- index_name = os.path.join(self.base, self.INDEX_NAME)
- options = utils.options_for(self.options, 'indexmanager.')
- im = IndexManager()
- # This will ensure the fulltext and so on are all assigned
- im.bind_to(self)
- im.connect(index_name, **options)
-
- self.create_descriptor(**options)
- self.indexmanager = im
-
- def load(self):
- if not self.indexmanager and self.local_indexmanager:
- # create a local storage using the indexmanager
- # otherwise we will connect the global manager
- # in load
- index_name = os.path.join(self.base, self.INDEX_NAME)
- options = utils.options_for(self.options, 'indexmanager.')
- im = IndexManager()
-
- desc = utils.options_for(self.options,
- 'indexmanager.',
- invert=True)
- if desc: self.create_descriptor(**desc)
-
- # This will ensure the fulltext and so on are all assigned
- im.bind_to(self)
- im.connect(index_name)
-
- self.indexmanager = im
-
- # Check that all entries have their metadata in the file system.
- if not os.path.exists(os.path.join(self.base, '.metadata.exported')):
- uids_to_export = []
- uids = self.indexmanager.get_all_ids()
-
- for uid in uids:
- if not os.path.exists(os.path.join(self.base, uid + '.metadata')):
- uids_to_export.append(uid)
-
- if uids_to_export:
- self._export_metadata_source = gobject.idle_add(
- self._export_metadata, uids_to_export)
- else:
- open(os.path.join(self.base, '.metadata.exported'), 'w').close()
-
- def _export_metadata(self, uids_to_export):
- uid = uids_to_export.pop()
- props = self.indexmanager.get(uid).properties
- self._store_metadata(uid, props)
- return len(uids_to_export) > 0
-
- def bind_to(self, datastore):
- ## signal from datastore that we are being bound to it
- self.datastore = datastore
-
- def localizedName(self, uid=None, content=None, target=None):
- """taking any of uid, a content object, or a direct target
- filename (which includes all of the relative components under a
- store). Return the localized filename that should be used _within_
- the repository for the storage of this content object
- """
- if target: return os.path.join(self.base, target)
- elif content:
- # see if it expects a filename
- fn, ext = content.suggestName()
- if fn: return os.path.join(self.base, fn)
- if ext: return os.path.join(self.base, "%s.%s" %
- (content.id, ext))
- if not uid: uid = content.id
-
- if uid:
- return os.path.join(self.base, uid)
- else:
- raise ValueError("""Nothing submitted to generate internal
- storage name from""")
-
- def _translatePath(self, uid):
- """translate a UID to a path name"""
- # paths into the datastore
- return os.path.join(self.base, str(uid))
-
- def _targetFile(self, uid, target=None, ext=None, env=None):
- logging.debug('FileBackingStore._targetFile: %r %r %r %r' % (uid, target, ext, env))
- # paths out of the datastore, working copy targets
- path = self._translatePath(uid)
- if not os.path.exists(path):
- return None
-
- if target: targetpath = target
- else:
- targetpath = uid.replace('/', '_').replace('.', '__')
- if ext:
- if not ext.startswith('.'): ext = ".%s" % ext
- targetpath = "%s%s" % (targetpath, ext)
-
- use_instance_dir = os.path.exists('/etc/olpc-security') and \
- os.getuid() != self.current_user_id
- if use_instance_dir:
- if not self.current_user_id:
- raise ValueError("Couldn't determine the current user uid.")
- base = os.path.join(os.environ['HOME'], 'isolation', '1', 'uid_to_instance_dir',
- str(self.current_user_id))
- else:
- profile = os.environ.get('SUGAR_PROFILE', 'default')
- base = os.path.join(os.path.expanduser('~'), '.sugar', profile, 'data')
- if not os.path.exists(base):
- os.makedirs(base)
-
- targetpath = os.path.join(base, targetpath)
- attempt = 0
- while os.path.exists(targetpath):
- # here we look for a non-colliding name
- # this is potentially a race and so we abort after a few
- # attempts
- targetpath, ext = os.path.splitext(targetpath)
-
- if filename_attempt_pattern.search(targetpath):
- targetpath = filename_attempt_pattern.sub('', targetpath)
-
- attempt += 1
- if attempt > 9:
- targetpath = "%s(%s).%s" % (targetpath, time.time(), ext)
- break
-
- targetpath = "%s(%s)%s" % (targetpath, attempt, ext)
-
- # Try to make the original file readable. This can fail if the file is
- # in FAT filesystem.
- try:
- os.chmod(path, 0604)
- except OSError, e:
- if e.errno != errno.EPERM:
- raise
-
- # Try to hard link from the original file to the targetpath. This can
- # fail if the file is in a different filesystem. Do a symlink instead.
- try:
- os.link(path, targetpath)
- except OSError, e:
- if e.errno == errno.EXDEV:
- os.symlink(path, targetpath)
- else:
- raise
-
- return open(targetpath, 'r')
-
- def _mapContent(self, uid, fp, path, env=None):
- """map a content object and the file in the repository to a
- working copy.
- """
- # env would contain things like cwd if we wanted to map to a
- # known space
-
- content = self.indexmanager.get(uid)
- # we need to map a copy of the content from the backingstore into the
- # activities addressable space.
- # map this to a rw file
- if fp:
- target, ext = content.suggestName()
- targetfile = self._targetFile(uid, target, ext, env)
- content.file = targetfile
-
- if self.options.get('verify', False):
- c = sha.sha()
- for line in targetfile:
- c.update(line)
- fp.seek(0)
- if c.hexdigest() != content.checksum:
- raise ValueError("Content for %s corrupt" % uid)
- return content
-
- def _writeContent_complete(self, path, completion=None):
- self._set_permissions_if_possible(path)
- if completion is None:
- return path
- completion(None, path)
- return None
-
- def _set_permissions_if_possible(self, path):
- try:
- os.chmod(path, 0604)
- except OSError, e:
- # This can fail for usb sticks.
- if e.errno != errno.EPERM:
- raise
-
- def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
- completion=None):
- """Returns: path of file in datastore (new path if it was copied/moved)"""
- content = None
- if target: path = target
- else:
- path = self._translatePath(uid)
-
- if replace is False and os.path.exists(path):
- raise KeyError("objects with path:%s for uid:%s exists" %(
- path, uid))
-
- if filelike.name != path:
- # protection on inplace stores
- if completion is None:
- bin_copy.bin_copy(filelike.name, path)
- self._set_permissions_if_possible(path)
- return path
-
- if can_move:
- bin_copy.bin_mv(filelike.name, path)
- self._set_permissions_if_possible(path)
- return self._writeContent_complete(path, completion)
-
- # Otherwise, async copy
- aco = AsyncCopy(filelike.name, path, completion)
- aco.start()
- else:
- return self._writeContent_complete(path, completion)
-
- def _checksum(self, filename):
- c = sha.sha()
- fp = open(filename, 'r')
- for line in fp:
- c.update(line)
- fp.close()
- return c.hexdigest()
-
- # File Management API
- def _encode_json(self, metadata, file_path):
- if has_cjson:
- f = open(file_path, 'w')
- f.write(cjson.encode(metadata))
- f.close()
- else:
- simplejson.dump(metadata, open(file_path, 'w'))
-
- def _store_metadata(self, uid, props):
- t = time.time()
- temp_path = os.path.join(self.base, '.temp_metadata')
- props = props.copy()
- for property_name in model.defaultModel.get_external_properties():
- if property_name in props:
- del props[property_name]
- self._encode_json(props, temp_path)
- path = os.path.join(self.base, uid + '.metadata')
- os.rename(temp_path, path)
- logging.debug('exported metadata: %r s.' % (time.time() - t))
-
- def _delete_metadata(self, uid):
- path = os.path.join(self.base, uid + '.metadata')
- if os.path.exists(path):
- os.unlink(path)
-
- def _create_completion(self, uid, props, completion, exc=None, path=None):
- if exc:
- completion(exc)
- return
- try:
- # Index the content this time
- self.indexmanager.index(props, path)
- completion(None, uid)
- except Exception, exc:
- completion(exc)
-
- def create_async(self, props, filelike, can_move=False, completion=None):
- if completion is None:
- raise RuntimeError("Completion must be valid for async create")
- uid = self.indexmanager.index(props)
- self._store_metadata(uid, props)
- props['uid'] = uid
- if filelike:
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- self._writeContent(uid, filelike, replace=False, can_move=can_move,
- completion=lambda *args: self._create_completion(uid, props, completion, *args))
- else:
- completion(None, uid)
-
- def create(self, props, filelike, can_move=False):
- if filelike:
- uid = self.indexmanager.index(props)
- self._store_metadata(uid, props)
- props['uid'] = uid
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
- self.indexmanager.index(props, path)
- return uid
- else:
- uid = self.indexmanager.index(props)
- self._store_metadata(uid, props)
- return uid
-
- def get(self, uid, env=None, allowMissing=False, includeFile=False):
- content = self.indexmanager.get(uid)
- if not content: raise KeyError(uid)
- path = self._translatePath(uid)
- fp = None
- # not all content objects have a file
- if includeFile and os.path.exists(path):
- fp = open(path, 'r')
- # now return a Content object from the model associated with
- # this file object
- content = self._mapContent(uid, fp, path, env)
- if fp:
- fp.close()
- return content
-
- def _update_completion(self, uid, props, completion, exc=None, path=None):
- if exc is not None:
- completion(exc)
- return
- try:
- self.indexmanager.index(props, path)
- completion()
- except Exception, exc:
- completion(exc)
-
- def update_async(self, uid, props, filelike, can_move=False, completion=None):
- logging.debug('backingstore.update_async')
- if filelike is None:
- raise RuntimeError("Filelike must be valid for async update")
- if completion is None:
- raise RuntimeError("Completion must be valid for async update")
-
- props['uid'] = uid
- self._store_metadata(uid, props)
- if filelike:
- uid = self.indexmanager.index(props, filelike)
- props['uid'] = uid
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- self._writeContent(uid, filelike, can_move=can_move,
- completion=lambda *args: self._update_completion(uid, props, completion, *args))
- else:
- self.indexmanager.index(props)
- completion()
-
- def update(self, uid, props, filelike=None, can_move=False):
- props['uid'] = uid
- self._store_metadata(uid, props)
- if filelike:
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- path = self._writeContent(uid, filelike, can_move=can_move)
- self.indexmanager.index(props, path)
- else:
- self.indexmanager.index(props)
-
- def _delete_external_properties(self, uid):
- external_properties = model.defaultModel.get_external_properties()
- for property_name in external_properties:
- file_path = os.path.join(self.base, property_name, uid)
- if os.path.exists(file_path):
- logging.debug('deleting external property: %r' % file_path)
- os.unlink(file_path)
-
- def delete(self, uid, allowMissing=True):
- self._delete_external_properties(uid)
- self._delete_metadata(uid)
-
- self.indexmanager.delete(uid)
- path = self._translatePath(uid)
- if os.path.exists(path):
- os.unlink(path)
- else:
- if not allowMissing:
- raise KeyError("object for uid:%s missing" % uid)
-
- def get_uniquevaluesfor(self, propertyname):
- return self.indexmanager.get_uniquevaluesfor(propertyname)
-
-
- def get_external_property(self, doc_id, key):
- # external properties default to the following storage
- # <repo>/key/uid which is the file containing the external
- # data. its contents is returned by this call
- # when missing or absent '' is returned
- pfile = os.path.join(self.base, key, str(doc_id))
- if os.path.exists(pfile): v = open(pfile, 'r').read()
- else: v = ''
- return dbus.ByteArray(v)
-
-
- def set_external_property(self, doc_id, key, value):
- pdir = os.path.join(self.base, key)
- if not os.path.exists(pdir): os.mkdir(pdir)
- pfile = os.path.join(pdir, doc_id)
- fp = open(pfile, 'w')
- fp.write(value)
- fp.close()
-
-
- def find(self, query, order_by=None, limit=None, offset=0):
- if not limit: limit = 4069
- return self.indexmanager.search(query, start_index=offset, end_index=limit, order_by=order_by)
-
- def ids(self):
- return self.indexmanager.get_all_ids()
-
- def stop(self):
- if self._export_metadata_source is not None:
- gobject.source_remove(self._export_metadata_source)
- self.indexmanager.stop()
-
- def complete_indexing(self):
- self.indexmanager.complete_indexing()
-
-class InplaceFileBackingStore(FileBackingStore):
- """Like the normal FileBackingStore this Backingstore manages the
- storage of files, but doesn't move files into a repository. There
- are no working copies. It simply adds index data through its
- indexmanager and provides fulltext ontop of a regular
- filesystem. It does record its metadata relative to this mount
- point.
-
- This is intended for USB keys and related types of attachable
- storage.
- """
-
- STORE_NAME = ".olpc.store"
-
- def __init__(self, uri, **kwargs):
- # remove the 'inplace:' scheme
- uri = uri[len('inplace:'):]
- super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
- # use the original uri
- self.uri = uri
- self._walk_source = None
-
- @staticmethod
- def parse(uri):
- return uri.startswith("inplace:")
-
- def check(self):
- if not os.path.exists(self.uri): return False
- if not os.path.exists(self.base): return False
- return True
-
-
- def load(self):
- try:
- super(InplaceFileBackingStore, self).load()
- except xapian.DatabaseCorruptError, e:
- # TODO: Try to recover in a smarter way than deleting the base
- # dir and reinitializing the index.
-
- logging.error('Error while trying to load mount point %s: %s. ' \
- 'Will try to renitialize and load again.' % (self.base, e))
-
- # Delete the base dir and its contents
- for root, dirs, files in os.walk(self.base, topdown=False):
- for name in files:
- os.remove(os.path.join(root, name))
- os.rmdir(root)
-
- self.initialize()
- self.load()
- return
-
- # now map/update the existing data into the indexes
- # but do it async
- files_to_check = []
- for dirpath, dirname, filenames in os.walk(self.uri):
- if self.base in dirpath: continue
- if self.STORE_NAME in dirname:
- dirname.remove(self.STORE_NAME)
-
- # blacklist all the hidden directories
- if '/.' in dirpath: continue
-
- for fn in filenames:
- # ignore conventionally hidden files
- if fn.startswith("."):
- continue
- files_to_check.append((dirpath, fn))
-
- self._walk_source = gobject.idle_add(self._walk, files_to_check)
-
- def _walk(self, files_to_check):
- dirpath, fn = files_to_check.pop()
- logging.debug('InplaceFileBackingStore._walk(): %r' % fn)
- try:
- source = os.path.join(dirpath, fn)
- relative = source[len(self.uri)+1:]
-
- result, count = self.indexmanager.search(dict(filename=relative))
- mime_type = gnomevfs.get_mime_type(source)
- stat = os.stat(source)
- ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
- mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
- title = os.path.splitext(os.path.split(source)[1])[0]
- metadata = dict(filename=relative,
- mime_type=mime_type,
- ctime=ctime,
- mtime=mtime,
- title=title)
- if not count:
- # create a new record
- self.create(metadata, source)
- else:
- # update the object with the new content iif the
- # checksum is different
- # XXX: what if there is more than one? (shouldn't
- # happen)
-
- # FIXME This is throwing away all the entry metadata.
- # Disabled for trial-3. We are not doing indexing
- # anyway so it would just update the mtime which is
- # not that useful. Also the journal is currently
- # setting the mime type before saving the file making
- # the mtime check useless.
- #
- # content = result.next()
- # uid = content.id
- # saved_mtime = content.get_property('mtime')
- # if mtime != saved_mtime:
- # self.update(uid, metadata, source)
- pass
- except Exception, e:
- logging.exception('Error while processing %r: %r' % (fn, e))
-
- if files_to_check:
- return True
- else:
- self._walk_source = None
- return False
-
- def _translatePath(self, uid):
- try: content = self.indexmanager.get(uid)
- except KeyError: return None
- return os.path.join(self.uri, content.get_property('filename', uid))
-
-## def _targetFile(self, uid, target=None, ext=None, env=None):
-## # in this case the file should really be there unless it was
-## # deleted in place or something which we typically isn't
-## # allowed
-## # XXX: catch this case and remove the index
-## targetpath = self._translatePath(uid)
-## return open(targetpath, 'rw')
-
- # File Management API
- def create_async(self, props, filelike, completion, can_move=False):
- """Inplace backing store doesn't copy, so no need for async"""
- try:
- uid = self.create(props, filelike, can_move)
- completion(None, uid)
- except Exception, exc:
- completion(exc)
-
- def _get_unique_filename(self, suggested_filename):
- # Invalid characters in VFAT filenames. From
- # http://en.wikipedia.org/wiki/File_Allocation_Table
- invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\x7F']
- invalid_chars.extend([chr(x) for x in range(0, 32)])
- filename = suggested_filename
- for char in invalid_chars:
- filename = filename.replace(char, '_')
-
- # FAT limit is 255, leave some space for uniqueness
- max_len = 250
- if len(filename) > max_len:
- name, extension = os.path.splitext(filename)
- filename = name[0:max_len - extension] + extension
-
- if os.path.exists(os.path.join(self.uri, filename)):
- i = 1
- while len(filename) <= max_len:
- name, extension = os.path.splitext(filename)
- filename = name + '_' + str(i) + extension
- if not os.path.exists(os.path.join(self.uri, filename)):
- break
- i += 1
-
- if len(filename) > max_len:
- filename = None
-
- return filename
-
- def create(self, props, filelike, can_move=False):
- # the file would have already been changed inplace
- # don't touch it
- proposed_name = None
- if filelike:
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- # usually with USB drives and the like the file we are
- # indexing is already on it, however in the case of moving
- # files to these devices we need to detect this case and
- # place the file
- proposed_name = props.get('filename', None)
- if proposed_name is None:
- proposed_name = props.get('suggested_filename', None)
- if proposed_name is None:
- proposed_name = os.path.split(filelike.name)[1]
- proposed_name = self._get_unique_filename(proposed_name)
-
- # record the name before qualifying it to the store
- props['filename'] = proposed_name
- proposed_name = os.path.join(self.uri, proposed_name)
-
- uid = self.indexmanager.index(props)
- props['uid'] = uid
- path = filelike
- if proposed_name and not os.path.exists(proposed_name):
- path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
- self.indexmanager.index(props, path)
- return uid
-
- def get(self, uid, env=None, allowMissing=False):
- content = self.indexmanager.get(uid)
- if not content: raise KeyError(uid)
- return content
-
- def update_async(self, uid, props, filelike, completion, can_move=False):
- try:
- self.update(uid, props, filelike, can_move)
- completion()
- except Exception, exc:
- completion(exc)
-
- def update(self, uid, props, filelike=None, can_move=False):
- # the file would have already been changed inplace
- # don't touch it
- props['uid'] = uid
-
- proposed_name = None
- if filelike:
- if isinstance(filelike, basestring):
- # lets treat it as a filename
- filelike = open(filelike, "r")
- filelike.seek(0)
- # usually with USB drives and the like the file we are
- # indexing is already on it, however in the case of moving
- # files to these devices we need to detect this case and
- # place the file
- proposed_name = props.get('filename', None)
- if not proposed_name:
- proposed_name = os.path.split(filelike.name)[1]
- # record the name before qualifying it to the store
- props['filename'] = proposed_name
- proposed_name = os.path.join(self.uri, proposed_name)
-
- path = filelike
- if proposed_name:
- path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
- self.indexmanager.index(props, path)
-
- def delete(self, uid):
- self._delete_external_properties(uid)
-
- c = self.indexmanager.get(uid)
- path = c.get_property('filename', None)
- self.indexmanager.delete(uid)
-
- if path:
- path = os.path.join(self.uri, path)
- if os.path.exists(path):
- os.unlink(path)
-
- def stop(self):
- if self._walk_source is not None:
- gobject.source_remove(self._walk_source)
- self.indexmanager.stop(force=True)
-
- def complete_indexing(self):
- # TODO: Perhaps we should move the inplace indexing to be sync here?
- self.indexmanager.complete_indexing()
-
diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py
deleted file mode 100644
index 6cf7036..0000000
--- a/src/olpc/datastore/bin_copy.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import os, subprocess
-
-
-def bin_copy(src, dest):
- try:
- subprocess.check_call(['/bin/cp', src, dest])
- except subprocess.CalledProcessError:
- raise OSError("Copy failed %s %s" % (src, dest))
-
-def bin_mv(src, dest):
- try:
- subprocess.check_call(['/bin/mv', src, dest])
- except subprocess.CalledProcessError:
- raise OSError("Move failed %s %s" % (src, dest))
-
-if __name__ == "__main__":
- import sys
- if len(sys.argv) != 3:
- raise SystemExit("usage: <src> <dest>")
-
- src, dest = sys.argv[1:]
-
- if not os.path.exists(src): raise OSError("missing src file")
-
- bin_copy(src, dest)
-
-
diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py
deleted file mode 100644
index 75f7568..0000000
--- a/src/olpc/datastore/converter.py
+++ /dev/null
@@ -1,165 +0,0 @@
-"""
-olpc.datastore.converter
-~~~~~~~~~~~~~~~~~~~~
-Convert binary formats to unicode text for indexing.
-
-Normally we'd make heavy reliance on 3rd party tools to do
-conversion. In the olpc use-case we want to minimize such
-dependencies. As such we make a minimal attempt to extract what text
-we can.
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-from olpc.datastore.utils import Singleton
-import codecs
-import logging
-import os
-import subprocess
-import sys
-import tempfile
-import gnomevfs
-
-def guess_mimetype(filename):
- fn = os.path.abspath(filename)
- mimetype = gnomevfs.get_mime_type(fn)
- return mimetype
-
-class subprocessconverter(object):
- """Process a command. Collect the output
-
- commands will have the following variables available to them for
- substitution. 'source' is required and is the input file.
- 'target' is optional, but if its omitted the subprocessconverter
- must supply an implict_target(source) method which returns the
- name of the expected output.
-
- A file object opened for reading will be returned to be passed to
- the indexer.
-
- %(source)s
- %(target)s
-
- pdftotext %(source)s %s(target)s
- """
- def __init__(self, cmd, find_target=None):
- self.raw = cmd
- self.require_target = False
- self.find_target = find_target
-
- if '%(source)s' not in cmd:
- raise ValueError("doesn't handle source")
- if '%(target)s' not in cmd:
- if not callable(find_target):
- raise ValueError("no way of locating conversion target")
- self.require_target = True
-
- def verify(self):
- """should this converter be used?"""
- return os.path.exists(self.raw.split()[0])
-
- def __call__(self, filename):
- data = {}
- data['source'] = filename
- if self.require_target:
- # XXX: methods that return something bad here
- # will result in the wrong thing being unlinked
- target = data['target'] = self.find_target(filename)
- else:
- target = data['target'] = tempfile.mkstemp()[1]
- cmd = self.raw % data
-
- try:
- cmd = cmd.split()
- # the stderr capture here will hide glib error messages
- # from converters which shouldn't be generating output anyway
- retcode = subprocess.call(cmd, stderr=subprocess.PIPE)
- if retcode: return None
- return codecs.open(target, 'r', 'utf-8')
- except UnicodeDecodeError:
- # The data was an unknown type but couldn't be understood
- # as text so we don't attempt to index it. This most
- # likely means its just an unknown binary format.
- return None
- finally:
- # we unlink the file as its already been opened for
- # reading
- if os.path.exists(target):
- os.unlink(target)
-
-class noop(object):
- def verify(self): return True
- def __call__(self, filename):
- return codecs.open(filename, 'r', 'utf-8')
-
-class Converter(object):
- __metaclass__ = Singleton
- def __init__(self):
- # maps both extension -> plugin
- # and mimetype -> plugin
- self._converters = {}
- self._default = None
- self.logger = logging.getLogger('org.laptop.sugar.Indexer')
-
- def registerConverter(self, ext_or_mime, plugin):
- if plugin.verify():
- self._converters[ext_or_mime] = plugin
- if self._default is None: self._default = plugin
-
- def __call__(self, filename, encoding=None, mimetype=None):
- """Convert filename's content to utf-8 encoded text."""
- #encoding is passed its the known encoding of the
- #contents. When None is passed the encoding is guessed which
- #can result in unexpected or no output.
- if mimetype: mt = mimetype
- else: mt = guess_mimetype(filename)
- maintype, subtype = mt.split('/',1)
-
- converter = self._converters.get(mt)
- if not converter:
- converter = self._default
- # it was an image or an unknown application
- if maintype in ['image', 'application', 'audio', 'video'] or \
- subtype in ['x-trash', 'x-python-bytecode',]:
- converter = None
-
- if converter:
- try: return converter(filename)
- except:
- logging.debug("Binary to Text failed: %s %s" %
- (mt, filename), exc_info=sys.exc_info())
-
- return None
-
-# our global instance
-converter = Converter()
-
-# TXT
-txt = noop()
-converter.registerConverter('.txt', txt)
-converter.registerConverter('.html', txt)
-converter.registerConverter('text/plain', txt)
-converter.registerConverter('text/html', txt)
-
-# PDF
-pdf2txt = subprocessconverter('/usr/bin/pdftotext -nopgbrk -enc UTF-8 %(source)s %(target)s')
-converter.registerConverter('.pdf', pdf2txt)
-converter.registerConverter('application/pdf', pdf2txt)
-
-
-# DOC
-def find_by_ext(filename, ext="txt"):
- return "%s.%s" % (os.path.splitext(filename)[0], ext)
-
-doctotext = subprocessconverter('/usr/bin/abiword -t txt %(source)s', find_by_ext)
-converter.registerConverter('.doc', doctotext)
-converter.registerConverter('application/msword', doctotext)
-
-# ODT
-converter.registerConverter('.odt', doctotext)
-converter.registerConverter('application/vnd.oasis.opendocument.text', doctotext)
-
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index a15d5cf..8186960 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -1,22 +1,35 @@
-"""
-olpc.datastore.datastore
-~~~~~~~~~~~~~~~~~~~~~~~~
-the datastore facade
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-
+# Copyright (C) 2008, One Laptop Per Child
+# Based on code Copyright (C) 2007, ObjectRealms, LLC
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import logging
-import dbus.service
-import dbus.mainloop.glib
+import uuid
+import time
+import os
-from olpc.datastore import utils
+import dbus
+import gobject
+
+from olpc.datastore import layoutmanager
+from olpc.datastore import migration
+from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT
+from olpc.datastore.metadatastore import MetadataStore
+from olpc.datastore.indexstore import IndexStore
+from olpc.datastore.filestore import FileStore
+from olpc.datastore.optimizer import Optimizer
# the name used by the logger
DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
@@ -27,468 +40,259 @@ DS_OBJECT_PATH = "/org/laptop/sugar/DataStore"
logger = logging.getLogger(DS_LOG_CHANNEL)
-DEFAULT_LIMIT = 65536
-
class DataStore(dbus.service.Object):
-
+ """D-Bus API and logic for connecting all the other components.
+ """
def __init__(self, **options):
- self.options = options
- self.backends = []
- self.mountpoints = {}
- self.root = None
-
- # global handle to the main look
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- session_bus = dbus.SessionBus()
-
- self.bus_name = dbus.service.BusName(DS_SERVICE,
- bus=session_bus,
- replace_existing=False,
- allow_replacement=False)
- dbus.service.Object.__init__(self, self.bus_name, DS_OBJECT_PATH)
+ bus_name = dbus.service.BusName(DS_SERVICE,
+ bus=dbus.SessionBus(),
+ replace_existing=False,
+ allow_replacement=False)
+ dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH)
-
- ####
- ## Backend API
- ## register a set of datastore backend factories which will manage
- ## storage
- def registerBackend(self, backendClass):
- self.backends.append(backendClass)
-
- ## MountPoint API
- #@utils.sanitize_dbus
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="sa{sv}",
- out_signature='s')
- def mount(self, uri, options=None):
- """(re)Mount a new backingstore for this datastore.
- Returns the mountpoint id or an empty string to indicate failure.
- """
- # on some media we don't want to write the indexes back to the
- # medium (maybe an SD card for example) and we'd want to keep
- # that on the XO itself. In these cases their might be very
- # little identifying information on the media itself.
- uri = str(uri)
-
- _options = utils._convert(options)
- if _options is None: _options = {}
-
- mp = self.connect_backingstore(uri, **_options)
- if not mp: return ''
- if mp.id in self.mountpoints:
- self.mountpoints[mp.id].stop()
-
- mp.bind_to(self)
- self.mountpoints[mp.id] = mp
- if self.root is None:
- self.root = mp
-
- self.Mounted(mp.descriptor())
- return mp.id
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="",
- out_signature="aa{sv}")
- def mounts(self):
- """return a list of mount point descriptiors where each
- descriptor is a dict containing atleast the following keys:
- 'id' -- the id used to refer explicitly to the mount point
- 'title' -- Human readable identifier for the mountpoint
- 'uri' -- The uri which triggered the mount
- """
- return [mp.descriptor() for mp in self.mountpoints.itervalues()]
-
- #@utils.sanitize_dbus
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="s",
- out_signature="")
- def unmount(self, mountpoint_id):
- """Unmount a mountpoint by id"""
- if mountpoint_id not in self.mountpoints: return
- mp = self.mountpoints[mountpoint_id]
- mp.stop()
-
- del self.mountpoints[mountpoint_id]
- self.Unmounted(mp.descriptor())
+ layout_manager = layoutmanager.get_instance()
+ if layout_manager.get_version() == 0:
+ migration.migrate_from_0()
+ layout_manager.set_version(1)
+ layout_manager.index_updated = False
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Mounted(self, descriptior):
- """indicates that a new backingstore has been mounted by the
- datastore. Returns the mount descriptor, like mounts()"""
- pass
+ self._metadata_store = MetadataStore()
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Unmounted(self, descriptor):
- """indicates that a new backingstore has been mounted by the
- datastore. Returns the mount descriptor, like mounts()"""
- pass
-
-
- ### End Mount Points
-
- ### Backup support
- def pause(self, mountpoints=None):
- """ Deprecated. """
-
- def unpause(self, mountpoints=None):
- """ Deprecated. """
- ### End Backups
-
- def connect_backingstore(self, uri, **kwargs):
- """
- connect to a new backing store
-
- @returns: Boolean for success condition
- """
- bs = None
- for backend in self.backends:
- if backend.parse(uri) is True:
- bs = backend(uri, **kwargs)
- bs.initialize_and_load()
- # The backingstore should be ready to run
- break
- return bs
-
-
- def _resolveMountpoint(self, mountpoint=None):
- if isinstance(mountpoint, dict):
- mountpoint = mountpoint.pop('mountpoint', None)
-
- if mountpoint is not None:
- # this should be the id of a mount point
- mp = self.mountpoints[mountpoint]
+ self._index_store = IndexStore()
+ try:
+ self._index_store.open_index()
+ except Exception, e:
+ logging.error('Failed to open index, will rebuild: %r', e)
+ layout_manager.index_updated = False
+ self._index_store.remove_index()
+ self._index_store.open_index()
+
+ self._file_store = FileStore()
+
+ if not layout_manager.index_updated:
+ logging.debug('Index is not up-to-date, will update')
+ self._rebuild_index()
+
+ self._optimizer = Optimizer(self._file_store, self._metadata_store)
+
+ def _rebuild_index(self):
+ uids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with uids %r' % uids)
+ gobject.idle_add(lambda: self.__rebuild_index_cb(uids),
+ priority=gobject.PRIORITY_LOW)
+
+ def __rebuild_index_cb(self, uids):
+ if uids:
+ uid = uids.pop()
+
+ logging.debug('Updating entry %r in index. %d to go.' % \
+ (uid, len(uids)))
+
+ if not self._index_store.contains(uid):
+ props = self._metadata_store.retrieve(uid)
+ self._index_store.store(uid, props)
+
+ if not uids:
+ logging.debug('Finished updating index.')
+ layoutmanager.get_instance().index_updated = True
+ return False
else:
- # the first one is the default
- mp = self.root
- return mp
+ return True
- def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None):
- logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid))
+ def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
+ (async_cb, async_err_cb, uid, exc))
if exc is not None:
async_err_cb(exc)
return
self.Created(uid)
+ self._optimizer.optimize(uid)
logger.debug("created %s" % uid)
async_cb(uid)
- # PUBLIC API
- #@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='a{sv}sb',
out_signature='s',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
- def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None):
- """create a new entry in the datastore. If a file is passed it
- will be consumed by the datastore. Because the repository has
- a checkin/checkout model this will create a copy of the file
- in the repository. Changes to this file will not automatically
- be be saved. Rather it is recorded in its current state.
-
- When many backing stores are associated with a datastore
- new objects are created in the first datastore. More control
- over this process can come at a later time.
- """
- mp = self._resolveMountpoint(props)
- mp.create_async(props, filelike, can_move=transfer_ownership,
- completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args))
+ def create(self, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ uid = str(uuid.uuid4())
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Created(self, uid): pass
-
- def _single_search(self, mountpoint, query, order_by, limit, offset):
- results, count = mountpoint.find(query.copy(), order_by,
- limit + offset, offset)
- return list(results), count, 1
-
- def _multiway_search(self, query, order_by=None, limit=None, offset=None):
- mountpoints = query.pop('mountpoints', self.mountpoints)
- mountpoints = [self.mountpoints[str(m)] for m in mountpoints]
-
- if len(mountpoints) == 1:
- # Fast path the single mountpoint case
- return self._single_search(mountpoints[0], query,
- order_by, limit, offset)
-
- results = []
- # XXX: the merge will become *much* more complex in when
- # distributed versioning is implemented.
- # collect
- # some queries mutate the query-dict so we pass a copy each
- # time
- for mp in mountpoints:
- result, count = mp.find(query.copy(), order_by, limit)
- results.append(result)
-
- # merge
- d = {}
- for res in results:
- for hit in res:
- existing = d.get(hit.id)
- if not existing or \
- existing.get_property('mtime') < hit.get_property('mtime'):
- # XXX: age/version check
- d[hit.id] = hit
- return d, len(d), len(results)
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._create_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='as')
- def ids(self, mountpoint=None):
- """return all the ids of objects living on a given
- mountpoint"""
- if str(mountpoint) == "": mountpoint=None
- mp = self._resolveMountpoint(mountpoint)
- return mp.ids()
-
-
- #@utils.sanitize_dbus
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}as',
- out_signature='aa{sv}u')
- def find(self, query=None, properties=None, **kwargs):
- """find(query)
- takes a dict of parameters and returns data in the following
- format
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Created(self, uid):
+ pass
- (results, count)
+ def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \
+ (async_cb, async_err_cb, exc))
+ if exc is not None:
+ async_err_cb(exc)
+ return
- where results are:
- [ {props}, {props}, ... ]
+ self.Updated(uid)
+ self._optimizer.optimize(uid)
+ logger.debug("updated %s" % uid)
+ async_cb()
- which is to be read, results is an ordered list of property
- dicts, akin to what is returned from get_properties. 'uid' is
- included in the properties dict as well and is the unique
- identifier used in subsequent calls to refer to that object.
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def update(self, uid, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+
+ if os.path.exists(self._file_store.get_file_path(uid)) and \
+ (not file_path or os.path.exists(file_path)):
+ self._optimizer.remove(uid)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._update_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
- special keywords in the query that are supported are more
- fully documented in the query.py::find method docstring.
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Updated(self, uid):
+ pass
- The 'include_files' keyword will trigger the availability of
- user accessible files. Because these are working copies we
- don't want to generate them unless needed. In the case the
- the full properties set matches doing the single roundtrip
- to start an activity makes sense.
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='a{sv}as',
+ out_signature='aa{sv}u')
+ def find(self, query, properties):
+ t = time.time()
- To order results by a given property you can specify:
- >>> ds.find(order_by=['author', 'title'])
+ if not layoutmanager.get_instance().index_updated:
+ logging.warning('Index updating, returning all entries')
- Order by must be a list of property names given in the order
- of decreasing precedence.
+ uids = layoutmanager.get_instance().find_all()
+ count = len(uids)
- """
- # only goes to the primary now. Punting on the merge case
- if isinstance(query, dict):
- kwargs.update(query)
+ offset = query.get('offset', 0)
+ limit = query.get('limit', MAX_QUERY_LIMIT)
+ uids = uids[offset, offset + limit]
else:
- if 'query' not in kwargs:
- kwargs['query'] = query
-
- include_files = kwargs.pop('include_files', False)
- order_by = kwargs.pop('order_by', [])
-
- # XXX: this is a workaround, deal properly with n backends
- limit = kwargs.pop('limit', DEFAULT_LIMIT)
- offset = kwargs.pop('offset', 0)
-
- # distribute the search to all the mountpoints unless a
- # backingstore id set is specified
- # backends may be able to return sorted results, if there is
- # only a single backend in the query we can use pre-sorted
- # results directly
- results, count, results_from = self._multiway_search(kwargs,
- order_by,
- limit, offset)
+ try:
+ uids, count = self._index_store.find(query)
+ except Exception, e:
+ logging.error('Failed to query index, will rebuild: %r', e)
+ layoutmanager.get_instance().index_updated = False
+ self._index_store.close_index()
+ self._index_store.remove_index()
+ self._index_store.open_index()
+ self._rebuild_index()
+
+ entries = []
+ for uid in uids:
+ metadata = self._metadata_store.retrieve(uid, properties)
+ # Hack because the current journal expects the mountpoint property
+ # to be present.
+ metadata['mountpoint'] = '1'
+ entries.append(metadata)
+ logger.debug('find(): %r' % (time.time() - t))
+ return entries, count
-
- # ordering is difficult when we are dealing with sets from
- # more than one source. The model is this.
- # order by the primary (first) sort criteria, then do the rest
- # in post processing. This allows use to assemble partially
- # database sorted results from many sources and quickly
- # combine them.
- if results_from > 1:
- if order_by:
- # resolve key names to columns
- if isinstance(order_by, basestring):
- order_by = [o.strip() for o in order_by.split(',')]
-
- if not isinstance(order_by, list):
- logger.debug("bad query, order_by should be a list of property names")
- order_by = None
-
- # generate a sort function based on the complete set of
- # ordering criteria which includes the primary sort
- # criteria as well to keep it stable.
- def comparator(a, b):
- # we only sort on properties so
- for criteria in order_by:
- mode = 1 # ascending
- if criteria.startswith('-'):
- mode = -1
- criteria = criteria[1:]
- pa = a.get_property(criteria, None)
- pb = b.get_property(criteria, None)
- r = cmp(pa, pb) * mode
- if r != 0: return r
- return 0
-
-
- r = results.values()
- r.sort(comparator)
- results = r
-
- results = results[offset:limit+offset]
- else:
- results = results.values()
-
- d = []
- c = 0
- for r in results:
- props = r.properties
- props['uid'] = r.id
- props['mountpoint'] = r.backingstore.id
- props['filename'] = ''
- d.append(props)
-
- if properties:
- for name in props.keys():
- if name not in properties:
- del props[name]
- c+= 1
- if limit and c > limit: break
-
- return (d, count)
-
- def get(self, uid):
- mp = self._resolveMountpoint()
- c = None
- try:
- c = mp.get(uid)
- if c: return c
- except KeyError:
- pass
-
- if not c:
- for mp in self.mountpoints.itervalues():
- try:
- c = mp.get(uid)
- if c: break
- except KeyError:
- continue
- return c
-
- #@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='s',
out_signature='s',
sender_keyword='sender')
def get_filename(self, uid, sender=None):
- content = self.get(uid)
- if content:
- # Assign to the backing store the uid of the process that called
- # this method. This is needed for copying the file in the right
- # place.
- backingstore = content.backingstore
- backingstore.current_user_id = dbus.Bus().get_unix_user(sender)
- try:
- # Retrieving the file path for the file will cause the file to be
- # copied or linked to a directory accessible by the caller.
- file_path = content.filename
- except AttributeError:
- file_path = ''
- finally:
- backingstore.current_user_id = None
- return file_path
-
- #@utils.sanitize_dbus
+ user_id = dbus.Bus().get_unix_user(sender)
+ return self._file_store.retrieve(uid, user_id)
+
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='s',
out_signature='a{sv}')
def get_properties(self, uid):
- content = self.get(uid)
- props = content.properties
- props['mountpoint'] = content.backingstore.id
- return props
+ metadata = self._metadata_store.retrieve(uid)
+ # Hack because the current journal expects the mountpoint property to be
+ # present.
+ metadata['mountpoint'] = '1'
+ return metadata
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='sa{sv}',
out_signature='as')
def get_uniquevaluesfor(self, propertyname, query=None):
- propertyname = str(propertyname)
-
- if not query: query = {}
- mountpoints = query.pop('mountpoints', self.mountpoints)
- mountpoints = [self.mountpoints[str(m)] for m in mountpoints]
- results = set()
-
- for mp in mountpoints:
- result = mp.get_uniquevaluesfor(propertyname)
- results = results.union(result)
- return results
-
- def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None):
- logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc))
- if exc is not None:
- async_err_cb(exc)
- return
-
- self.Updated(content.id)
- logger.debug("updated %s" % content.id)
- async_cb()
-
- #@utils.sanitize_dbus
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}sb',
- out_signature='',
- async_callbacks=('async_cb', 'async_err_cb'),
- byte_arrays=True)
- def update(self, uid, props, filelike=None, transfer_ownership=False,
- async_cb=None, async_err_cb=None):
- """Record the current state of the object checked out for a
- given uid. If contents have been written to another file for
- example. You must create it
- """
- content = self.get(uid)
- mountpoint = props.pop('mountpoint', None)
- content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership,
- completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args))
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Updated(self, uid): pass
+ if propertyname != 'activity':
+ raise ValueError('Only ''activity'' is a supported property name')
+ if query:
+ raise ValueError('The query parameter is not supported')
+ if layoutmanager.get_instance().index_updated:
+ return self._index_store.get_activities()
+ else:
+ logging.warning('Index updating, returning an empty list')
+ return []
- #@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='s',
out_signature='')
def delete(self, uid):
- content = self.get(uid)
- if content:
- content.backingstore.delete(uid)
+ self._optimizer.remove(uid)
+
+ self._index_store.delete(uid)
+ self._file_store.delete(uid)
+ self._metadata_store.delete(uid)
+
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.removedirs(entry_path)
+
self.Deleted(uid)
logger.debug("deleted %s" % uid)
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Deleted(self, uid): pass
+ def Deleted(self, uid):
+ pass
def stop(self):
"""shutdown the service"""
+ self._index_store.close_index()
self.Stopped()
- self._connection.get_connection()._unregister_object_path(DS_OBJECT_PATH)
- for mp in self.mountpoints.values(): mp.stop()
-
@dbus.service.signal(DS_DBUS_INTERFACE)
- def Stopped(self): pass
+ def Stopped(self):
+ pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='',
- out_signature='')
- def complete_indexing(self):
- """Block waiting for all queued indexing operations to
- complete. Used mostly in testing"""
- for mp in self.mountpoints.itervalues():
- mp.complete_indexing()
-
+ in_signature="sa{sv}",
+ out_signature='s')
+ def mount(self, uri, options=None):
+ return ''
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="",
+ out_signature="aa{sv}")
+ def mounts(self):
+ return [{'id': 1}]
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="s",
+ out_signature="")
+ def unmount(self, mountpoint_id):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Mounted(self, descriptior):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Unmounted(self, descriptor):
+ pass
+
diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py
new file mode 100644
index 0000000..9662885
--- /dev/null
+++ b/src/olpc/datastore/filestore.py
@@ -0,0 +1,205 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import errno
+import logging
+
+import gobject
+
+from olpc.datastore import layoutmanager
+
+class FileStore(object):
+ """Handle the storage of one file per entry.
+ """
+ # TODO: add protection against store and retrieve operations on entries
+ # that are being processed async.
+
+ def store(self, uid, file_path, transfer_ownership, completion_cb):
+ """Store a file for a given entry.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(dir_path):
+ os.makedirs(dir_path)
+
+ destination_path = os.path.join(dir_path, 'data')
+ if file_path:
+ if not os.path.isfile(file_path):
+ raise ValueError('No file at %r' % file_path)
+ if transfer_ownership:
+ try:
+ logging.debug('FileStore moving from %r to %r' % \
+ (file_path, destination_path))
+ os.rename(file_path, destination_path)
+ completion_cb()
+ except OSError, e:
+ if e.errno == errno.EXDEV:
+ self._async_copy(file_path, destination_path,
+ completion_cb)
+ else:
+ raise
+ else:
+ self._async_copy(file_path, destination_path, completion_cb)
+ elif not file_path and os.path.exists(destination_path):
+ os.remove(destination_path)
+ completion_cb()
+ else:
+ logging.debug('FileStore: Nothing to do')
+ completion_cb()
+
+ def _async_copy(self, file_path, destination_path, completion_cb):
+ """Start copying a file asynchronously.
+
+ """
+ logging.debug('FileStore copying from %r to %r' % \
+ (file_path, destination_path))
+ async_copy = AsyncCopy(file_path, destination_path, completion_cb)
+ async_copy.start()
+
+ def retrieve(self, uid, user_id):
+ """Place the file associated to a given entry into a directory where the
+ user can read it. The caller is reponsible for deleting this file.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_path = os.path.join(dir_path, 'data')
+ if not os.path.exists(file_path):
+ return ''
+
+ use_instance_dir = os.path.exists('/etc/olpc-security') and \
+ os.getuid() != user_id
+ if use_instance_dir:
+ if not user_id:
+ raise ValueError('Couldnt determine the current user uid.')
+ destination_dir = os.path.join(os.environ['HOME'], 'isolation', '1',
+ 'uid_to_instance_dir', str(user_id))
+ else:
+ profile = os.environ.get('SUGAR_PROFILE', 'default')
+ destination_dir = os.path.join(os.path.expanduser('~'), '.sugar',
+ profile, 'data')
+ if not os.path.exists(destination_dir):
+ os.makedirs(destination_dir)
+
+ destination_path = os.path.join(destination_dir, uid)
+
+ # Try to make the original file readable. This can fail if the file is
+ # in a FAT filesystem.
+ try:
+ os.chmod(file_path, 0604)
+ except OSError, e:
+ if e.errno != errno.EPERM:
+ raise
+
+ # Try to hard link from the original file to the targetpath. This can
+ # fail if the file is in a different filesystem. Do a symlink instead.
+ try:
+ os.link(file_path, destination_path)
+ except OSError, e:
+ if e.errno == errno.EXDEV:
+ os.symlink(file_path, destination_path)
+ else:
+ raise
+
+ return destination_path
+
+ def get_file_path(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ return os.path.join(dir_path, 'data')
+
+ def delete(self, uid):
+ """Remove the file associated to a given entry.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_path = os.path.join(dir_path, 'data')
+ if os.path.exists(file_path):
+ os.remove(file_path)
+
+ def hard_link_entry(self, new_uid, existing_uid):
+ existing_file = os.path.join(
+ layoutmanager.get_instance().get_entry_path(existing_uid),
+ 'data')
+ new_file = os.path.join(
+ layoutmanager.get_instance().get_entry_path(new_uid),
+ 'data')
+
+ logging.debug('removing %r' % new_file)
+ os.remove(new_file)
+
+ logging.debug('hard linking %r -> %r' % (new_file, existing_file))
+ os.link(existing_file, new_file)
+
+class AsyncCopy(object):
+ """Copy a file in chunks in the idle loop.
+
+ """
+ CHUNK_SIZE = 65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logging.error('AC: Error writing %s -> %s: wrote less than '
+ 'expected' % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError(
+ 'Error writing data to destination file'))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ self._cleanup()
+ self.completion(None)
+ return False
+ except Exception, err:
+ logging.error("AC: Error copying %s -> %s: %r" % \
+ (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT,
+ 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ gobject.idle_add(self._copy_block)
+
diff --git a/src/olpc/datastore/indexstore.py b/src/olpc/datastore/indexstore.py
new file mode 100644
index 0000000..ec97b4a
--- /dev/null
+++ b/src/olpc/datastore/indexstore.py
@@ -0,0 +1,234 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import logging
+import time
+import os
+
+import gobject
+import xapian
+from xapian import WritableDatabase, Document, Enquire, Query, QueryParser
+
+from olpc.datastore import layoutmanager
+from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT
+
+_VALUE_UID = 0
+_VALUE_TIMESTAMP = 1
+_VALUE_ACTIVITY_ID = 2
+_VALUE_MIME_TYPE = 3
+_VALUE_ACTIVITY = 4
+_VALUE_KEEP = 5
+
+_PREFIX_UID = 'Q'
+_PREFIX_ACTIVITY = 'A'
+_PREFIX_MIME_TYPE = 'M'
+
+# Force a flush every _n_ changes to the db
+_FLUSH_THRESHOLD = 20
+
+# Force a flush after _n_ seconds since the last change to the db
+_FLUSH_TIMEOUT = 60
+
+_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview']
+
+_MAX_RESULTS = int(2 ** 31 - 1)
+
+class IndexStore(object):
+ """Index metadata and provide rich query facilities on it.
+ """
+ def __init__(self):
+ self._database = None
+ self._flush_timeout = None
+ self._pending_writes = 0
+
+ def open_index(self):
+ index_path = layoutmanager.get_instance().get_index_path()
+ self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN)
+
+ def close_index(self):
+ self._database.flush()
+ self._database = None
+
+ def remove_index(self):
+ index_path = layoutmanager.get_instance().get_index_path()
+ for f in os.listdir(index_path):
+ os.remove(os.path.join(index_path, f))
+
+ def contains(self, uid):
+ postings = self._database.postlist(_PREFIX_UID + uid)
+ try:
+ postlist_item = postings.next()
+ except StopIteration:
+ return False
+ return True
+
+ def store(self, uid, properties):
+ document = Document()
+ document.add_term(_PREFIX_UID + uid)
+ document.add_term(_PREFIX_ACTIVITY + properties['activity'])
+ document.add_term(_PREFIX_MIME_TYPE + properties['mime_type'])
+
+ document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp']))
+ document.add_value(_VALUE_ACTIVITY_ID, properties['activity_id'])
+ document.add_value(_VALUE_MIME_TYPE, str(properties['mime_type']))
+ document.add_value(_VALUE_ACTIVITY, properties['activity'])
+
+ term_generator = xapian.TermGenerator()
+
+ # TODO: we should do stemming, but in which language?
+ #if language is not None:
+ # term_generator.set_stemmer(_xapian.Stem(language))
+
+ # TODO: we should use a stopper
+ #if stop is not None:
+ # stopper = _xapian.SimpleStopper()
+ # for term in stop:
+ # stopper.add (term)
+ # term_generator.set_stopper (stopper)
+
+ term_generator.set_document(document)
+ term_generator.index_text_without_positions(
+ self._extract_text(properties), 1, '')
+
+ if not self.contains(uid):
+ self._database.add_document(document)
+ else:
+ self._database.replace_document(_PREFIX_UID + uid, document)
+ self._flush()
+
+ def _extract_text(self, properties):
+ text = ''
+ for key, value in properties.items():
+ if key not in _PROPERTIES_NOT_TO_INDEX:
+ if text:
+ text += ' '
+ text += str(value)
+ return text
+
+ def find(self, query):
+ enquire = Enquire(self._database)
+ enquire.set_query(self._parse_query(query))
+
+ offset = query.get('offset', 0)
+ limit = query.get('limit', MAX_QUERY_LIMIT)
+
+ # This will assure that the results count is exact.
+ check_at_least = offset + limit + 1
+
+ enquire.set_sort_by_value(_VALUE_TIMESTAMP, True)
+
+ query_result = enquire.get_mset(offset, limit, check_at_least)
+ total_count = query_result.get_matches_estimated()
+
+ uids = []
+ for hit in query_result:
+ uids.append(hit.document.get_value(_VALUE_UID))
+
+ return (uids, total_count)
+
+ def _parse_query(self, query_dict):
+ logging.debug('_parse_query %r' % query_dict)
+ queries = []
+
+ if query_dict.has_key('query'):
+ query_parser = QueryParser()
+ query_parser.set_database(self._database)
+ #query_parser.set_default_op(Query.OP_AND)
+
+ # TODO: we should do stemming, but in which language?
+ #query_parser.set_stemmer(_xapian.Stem(lang))
+ #query_parser.set_stemming_strategy(qp.STEM_SOME)
+
+ query = query_parser.parse_query(
+ query_dict['query'],
+ QueryParser.FLAG_PHRASE |
+ QueryParser.FLAG_BOOLEAN |
+ QueryParser.FLAG_LOVEHATE |
+ QueryParser.FLAG_WILDCARD,
+ '')
+
+ queries.append(query)
+
+ self._replace_mtime(query_dict)
+ if query_dict.has_key('timestamp'):
+ start = str(query_dict['timestamp'].pop('start', 0))
+ end = str(query_dict['timestamp'].pop('end', _MAX_RESULTS))
+ query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end)
+ queries.append(query)
+
+ if query_dict.has_key('uid'):
+ queries.append(Query(_PREFIX_UID + query_dict['uid']))
+
+ if query_dict.has_key('activity'):
+ queries.append(Query(_PREFIX_ACTIVITY + query_dict['activity']))
+
+ if query_dict.has_key('mime_type'):
+ mime_queries = []
+ for mime_type in query_dict['mime_type']:
+ mime_queries.append(Query(_PREFIX_MIME_TYPE + mime_type))
+ queries.append(Query(Query.OP_OR, mime_queries))
+
+ if not queries:
+ queries.append(Query(''))
+
+ return Query(Query.OP_AND, queries)
+
+ def _replace_mtime(self, query):
+ # TODO: Just a hack for the current journal that filters by mtime
+ DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
+ if query.has_key('mtime'):
+ mtime_range = query.pop('mtime')
+
+ start = mtime_range['start'][:-7]
+ start = time.mktime(time.strptime(start, DATE_FORMAT))
+
+ end = mtime_range['end'][:-7]
+ # FIXME: this will give an unexpected result if the journal is in a
+ # different timezone
+ end = time.mktime(time.strptime(end, DATE_FORMAT))
+
+ query['timestamp'] = {'start': int(start), 'end': int(end)}
+
+ def delete(self, uid):
+ self._database.delete_document(_PREFIX_UID + uid)
+
+ def get_activities(self):
+ activities = []
+ for term in self._database.allterms(_PREFIX_ACTIVITY):
+ activities.append(term.term[len(_PREFIX_ACTIVITY):])
+ return activities
+
+ def _flush_timeout_cb(self):
+ self._flush(True)
+ return False
+
+ def _flush(self, force=False):
+ """Called after any database mutation"""
+ logging.debug('IndexStore.flush: %r %r' % (force, self._pending_writes))
+
+ if self._flush_timeout is not None:
+ gobject.source_remove(self._flush_timeout)
+ self._flush_timeout = None
+
+ self._pending_writes += 1
+ if force or self._pending_writes > _FLUSH_THRESHOLD:
+ self._database.flush()
+ self._pending_writes = 0
+ else:
+ self._flush_timeout = gobject.timeout_add(_FLUSH_TIMEOUT * 1000,
+ self._flush_timeout_cb)
+
diff --git a/src/olpc/datastore/layoutmanager.py b/src/olpc/datastore/layoutmanager.py
new file mode 100644
index 0000000..2b3e521
--- /dev/null
+++ b/src/olpc/datastore/layoutmanager.py
@@ -0,0 +1,101 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+
+MAX_QUERY_LIMIT = 40960
+
+class LayoutManager(object):
+ """Provide the logic about how entries are stored inside the datastore directory
+ """
+ def __init__(self):
+ profile = os.environ.get('SUGAR_PROFILE', 'default')
+ base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
+
+ self._root_path = os.path.join(base_dir, 'datastore')
+
+ if not os.path.exists(self._root_path):
+ os.makedirs(self._root_path)
+ self.set_version(1)
+
+ self._create_if_needed(self.get_checksums_dir())
+ self._create_if_needed(self.get_queue_path())
+
+ index_updated_path = os.path.join(self._root_path, 'index_updated')
+ self._index_updated = os.path.exists(index_updated_path)
+
+ def _create_if_needed(self, path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+ def get_version(self):
+ version_path = os.path.join(self._root_path, 'version')
+ version = 0
+ if os.path.exists(version_path):
+ version = int(open(version_path, 'r').read())
+ return version
+
+ def set_version(self, version):
+ version_path = os.path.join(self._root_path, 'version')
+ open(version_path, 'w').write(str(version))
+
+ def get_entry_path(self, uid):
+ # os.path.join() is just too slow
+ return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+
+ def get_root_path(self):
+ return self._root_path
+
+ def get_index_path(self):
+ return os.path.join(self._root_path, 'index')
+
+ def get_checksums_dir(self):
+ return os.path.join(self._root_path, 'checksums')
+
+ def get_queue_path(self):
+ return os.path.join(self.get_checksums_dir(), 'queue')
+
+ def _is_index_updated(self):
+ return self._index_updated
+
+ def _set_index_updated(self, index_updated):
+ if index_updated != self._index_updated:
+ self._index_updated = index_updated
+
+ index_updated_path = os.path.join(self._root_path, 'index_updated')
+ if os.path.exists(index_updated_path):
+ os.remove(index_updated_path)
+ else:
+ open(index_updated_path, 'w').close()
+
+ index_updated = property(_is_index_updated, _set_index_updated)
+
+ def find_all(self):
+ uids = []
+ for f in os.listdir(self._root_path):
+ if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
+ for g in os.listdir(os.path.join(self._root_path, f)):
+ if len(g) == 36:
+ uids.append(g)
+ return uids
+
+_instance = None
+def get_instance():
+ global _instance
+ if _instance is None:
+ _instance = LayoutManager()
+ return _instance
+
diff --git a/src/olpc/datastore/metadatareader.c b/src/olpc/datastore/metadatareader.c
new file mode 100644
index 0000000..ce6d38e
--- /dev/null
+++ b/src/olpc/datastore/metadatareader.c
@@ -0,0 +1,199 @@
+#include "Python.h"
+
+#include <dirent.h>
+
+// TODO: put it in a place where python can use it when writing metadata
+#define MAX_PROPERTY_LENGTH 500 * 1024
+
+static PyObject *byte_array_type = NULL;
+
+static PyObject *
+metadatareader_retrieve(PyObject *unused, PyObject *args)
+{
+ PyObject *dict = NULL;
+ PyObject *properties = NULL;
+ const char *dir_path = NULL;
+ char *metadata_path = NULL;
+ DIR *dir_stream = NULL;
+ struct dirent *dir_entry = NULL;
+ char *file_path = NULL;
+ FILE *file = NULL;
+ char *value_buf = NULL;
+
+ if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties))
+ return NULL;
+
+ // Build path to the metadata directory
+ int metadata_path_size = strlen(dir_path) + 10;
+ metadata_path = PyMem_Malloc(metadata_path_size);
+ if (metadata_path == NULL) {
+ PyErr_NoMemory();
+ goto cleanup;
+ }
+ snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata");
+
+ dir_stream = opendir (metadata_path);
+ if (dir_stream == NULL) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Couldn't open metadata directory %s",
+ metadata_path);
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ dict = PyDict_New();
+
+ dir_entry = readdir(dir_stream);
+ while (dir_entry != NULL) {
+ long file_size;
+ int file_path_size;
+ PyObject *value = NULL;
+
+ // Skip . and ..
+ if (dir_entry->d_name[0] == '.' &&
+ (strlen(dir_entry->d_name) == 1 ||
+ (dir_entry->d_name[1] == '.' &&
+ strlen(dir_entry->d_name) == 2)))
+ goto next_property;
+
+ // Check if the property is in the properties list
+ if ((properties != Py_None) && (PyList_Size(properties) > 0)) {
+ int found = 0;
+ int i;
+ for (i = 0; i < PyList_Size(properties); i++) {
+ PyObject *property = PyList_GetItem(properties, i);
+ if (!strcmp (dir_entry->d_name, PyString_AsString (property))) {
+ found = 1;
+ }
+ }
+ if (!found) {
+ goto next_property;
+ }
+ }
+
+ // Build path of the property file
+ file_path_size = strlen(metadata_path) + 1 + strlen(dir_entry->d_name) +
+ 1;
+ file_path = PyMem_Malloc(file_path_size);
+ if (file_path == NULL) {
+ PyErr_NoMemory();
+ goto cleanup;
+ }
+ snprintf (file_path, file_path_size, "%s/%s", metadata_path,
+ dir_entry->d_name);
+
+ file = fopen(file_path, "r");
+ if (file == NULL) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Cannot open property file %s: %s",
+ file_path, strerror(errno));
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ // Get file size
+ fseek (file, 0, SEEK_END);
+ file_size = ftell (file);
+ rewind (file);
+
+ if (file_size == 0) {
+ // Empty property
+ value = PyString_FromString("");
+ if (value == NULL) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to convert value to python string");
+ goto cleanup;
+ }
+ } else {
+ if (file_size > MAX_PROPERTY_LENGTH) {
+ PyErr_SetString(PyExc_ValueError, "Property file too big");
+ goto cleanup;
+ }
+
+ // Read the whole file
+ value_buf = PyMem_Malloc(file_size);
+ if (value_buf == NULL) {
+ PyErr_NoMemory();
+ goto cleanup;
+ }
+ long read_size = fread(value_buf, 1, file_size, file);
+ if (read_size < file_size) {
+ char buf[256];
+ snprintf(buf, sizeof(buf),
+ "Error while reading property file %s", file_path);
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ // Convert value to dbus.ByteArray
+ PyObject *args = Py_BuildValue("(s#)", value_buf, file_size);
+ value = PyObject_CallObject(byte_array_type, args);
+ if (value == NULL) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to convert value to dbus.ByteArray");
+ goto cleanup;
+ }
+ }
+
+ // Add property to the metadata dict
+ if (PyDict_SetItemString(dict, dir_entry->d_name, value) == -1) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to add property to dictionary");
+ goto cleanup;
+ }
+
+ next_property:
+ if (file_path) {
+ PyMem_Free(file_path);
+ file_path = NULL;
+ }
+ if (file) {
+ fclose(file);
+ file = NULL;
+ }
+ if (value_buf) {
+ PyMem_Free(value_buf);
+ value_buf = NULL;
+ }
+
+ dir_entry = readdir(dir_stream);
+ }
+
+ closedir(dir_stream);
+
+ return dict;
+
+cleanup:
+ if (file_path) {
+ PyMem_Free(file_path);
+ }
+ if (value_buf) {
+ PyMem_Free(value_buf);
+ }
+ if (dict) {
+ Py_DECREF(dict);
+ }
+ if (file) {
+ fclose(file);
+ }
+ if (dir_stream) {
+ closedir(dir_stream);
+ }
+ return NULL;
+}
+
+static PyMethodDef metadatareader_functions[] = {
+ {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")},
+ {NULL, NULL, 0, NULL}
+};
+
+PyMODINIT_FUNC
+initmetadatareader(void)
+{
+ PyObject* mod;
+ mod = Py_InitModule("metadatareader", metadatareader_functions);
+
+ PyObject *dbus_module = PyImport_ImportModule("dbus");
+ byte_array_type = PyObject_GetAttrString(dbus_module, "ByteArray");
+}
+
diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py
new file mode 100644
index 0000000..8d1e377
--- /dev/null
+++ b/src/olpc/datastore/metadatastore.py
@@ -0,0 +1,50 @@
+import os
+
+from olpc.datastore import layoutmanager
+from olpc.datastore import metadatareader
+
+MAX_SIZE = 256
+
+class MetadataStore(object):
+ def store(self, uid, metadata):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(dir_path):
+ os.makedirs(dir_path)
+
+ metadata_path = os.path.join(dir_path, 'metadata')
+ if not os.path.exists(metadata_path):
+ os.makedirs(metadata_path)
+ else:
+ for key in os.listdir(metadata_path):
+ os.remove(os.path.join(metadata_path, key))
+
+ metadata['uid'] = uid
+ for key, value in metadata.items():
+ open(os.path.join(metadata_path, key), 'w').write(str(value))
+
+ def retrieve(self, uid, properties=None):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ return metadatareader.retrieve(dir_path, properties)
+
+ def delete(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ for key in os.listdir(metadata_path):
+ os.remove(os.path.join(metadata_path, key))
+ os.rmdir(metadata_path)
+
+ def get_property(self, uid, key):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ if os.path.exists(property_path):
+ return open(property_path, 'r').read()
+ else:
+ return None
+
+ def set_property(self, uid, key, value):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ open(property_path, 'w').write(value)
+
diff --git a/src/olpc/datastore/migration.py b/src/olpc/datastore/migration.py
new file mode 100644
index 0000000..7b47d6f
--- /dev/null
+++ b/src/olpc/datastore/migration.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Transform one DataStore directory in a newer format.
+"""
+
+import os
+import logging
+import shutil
+
+import cjson
+
+from olpc.datastore import layoutmanager
+
+def migrate_from_0():
+ logging.info('Migrating datastore from version 0 to version 1')
+ root_path = layoutmanager.get_instance().get_root_path()
+ old_root_path = os.path.join(root_path, 'store')
+ for f in os.listdir(old_root_path):
+ uid, ext = os.path.splitext(f)
+ if ext != '.metadata':
+ continue
+
+ logging.debug('Migrating entry %r' % uid)
+ try:
+ _migrate_metadata(root_path, old_root_path, uid)
+ _migrate_file(root_path, old_root_path, uid)
+ _migrate_preview(root_path, old_root_path, uid)
+ except Exception:
+ #logging.warning('Failed to migrate entry %r:%s\n' %(uid,
+ # ''.join(traceback.format_exception(*sys.exc_info()))))
+ #
+ # In production, we may choose to ignore errors when failing to
+ # migrate some entries. But for now, raise them.
+ raise
+
+ # Just be paranoid, it's cheap.
+ if old_root_path.endswith('datastore/store'):
+ shutil.rmtree(old_root_path)
+
+ logging.info('Migration finished')
+
+def _migrate_metadata(root_path, old_root_path, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ os.makedirs(metadata_path)
+
+ old_metadata_path = os.path.join(old_root_path, uid + '.metadata')
+ metadata = cjson.decode(open(old_metadata_path, 'r').read())
+ for key, value in metadata.items():
+ f = open(os.path.join(metadata_path, key), 'w')
+ try:
+ f.write(str(value))
+ finally:
+ f.close()
+
+def _migrate_file(root_path, old_root_path, uid):
+ if os.path.exists(os.path.join(old_root_path, uid)):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.rename(os.path.join(old_root_path, uid),
+ os.path.join(dir_path, 'data'))
+
+def _migrate_preview(root_path, old_root_path, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ os.rename(os.path.join(old_root_path, 'preview', uid),
+ os.path.join(metadata_path, 'preview'))
+
diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py
deleted file mode 100644
index e4a3e3b..0000000
--- a/src/olpc/datastore/model.py
+++ /dev/null
@@ -1,412 +0,0 @@
-"""
-olpc.datastore.model
-~~~~~~~~~~~~~~~~~~~~
-The datamodel for the metadata
-
-"""
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-import datetime
-import os
-import time
-import warnings
-import logging
-
-from sugar import mime
-
-from olpc.datastore.utils import timeparse
-
-
-# XXX: Open issues
-# list properties - Contributors (a, b, c)
-# difficult to index now
-# content state - searches don't include content deletion flag
-# - not recording if content is on other storage yet
-
-propertyTypes = {}
-_marker = object()
-
-def registerPropertyType(kind, get, set, xapian_sort_type=None,
- defaults=None, for_xapian=None, from_xapain=None):
- propertyTypes[kind] = PropertyImpl(get, set, xapian_sort_type,
- defaults, for_xapian=for_xapian, from_xapain=from_xapain)
-
-def propertyByKind(kind): return propertyTypes[kind]
-
-class PropertyImpl(object):
- __slots__ = ('_get', '_set', 'xapian_sort_type', 'defaults', '_for_xapian', '_from_xapian')
-
- def __init__(self, get, set, xapian_sort_type=None, defaults=None,
- for_xapian=None, from_xapain=None):
- self._get, self._set = get, set
- self.xapian_sort_type = xapian_sort_type
- self.defaults = defaults
- if not for_xapian: for_xapian = self._get
- self._for_xapian = for_xapian
- if not from_xapain: from_xapain = self._set
- self._from_xapian = from_xapain
-
- def get(self, value): return self._get(value)
- def set(self, value): return self._set(value)
- def for_xapian(self, value): return self._for_xapian(value)
- def from_xapian(self, value): return self._from_xapian(value)
-
-class Property(object):
- """Light-weight property implementation.
- Handles typed properties via a global registry of type->callbacks
-
- >>> p = Property(key, value, 'string')
- >>> b = Property(key, value, 'binary')
- """
- def __init__(self, key, value, kind=None):
-
- self.kind = kind
- if kind not in propertyTypes:
- warnings.warn("Unknown property type: %s on key %s" % \
- (kind, key), RuntimeWarning)
- else: self._impl = propertyTypes[kind]
-
- self.key = key
- self.value = value
-
- @classmethod
- def fromstring(cls, key, value=''):
- kind = 'string'
- if ':' in key:
- key, kind = key.split(':', 1)
- # now resolve the kind to a property class
- return cls(key, value, kind)
-
-
- def __repr__(self):
- return "<%s(%s) %s:%r>" % (self.__class__.__name__,
- self.kind,
- self.key, self.value)
-
- def get_value(self): return self._impl.get(self._value)
- def set_value(self, value): self._value = self._impl.set(value)
- value = property(get_value, set_value)
-
- @property
- def for_xapian(self): return self._impl.for_xapian(self._value)
-
-
- def __str__(self): return str(self.value)
-
-class Model(object):
- """Object containing the field/property model used by the
- system"""
-
- def __init__(self):
- self.fields = {}
- self.fieldnames = []
-
- def copy(self):
- m = Model()
- m.fields = self.fields.copy()
- m.fieldnames = self.fieldnames[:]
- return m
-
- def fromstring(self, key, value, allowAddition=False):
- """create a property from the key name by looking it up in the
- model."""
- kind = None
- if ':' in key: key, kind = key.split(':', 1)
- added = False
- field = self.fields.get(key)
- if field: mkind = field[1]
- elif allowAddition:
- # create a new field, this will force a change in the
- # model
- # and in turn should add a new field action
- if not kind: kind = "string"
- self.addField(key,kind)
- mkind = kind
- added = True
- else:
- raise KeyError("no field specification for %s" % key)
-
- if kind and mkind:
- if kind != mkind: raise ValueError("""Specified wire
- encoding for property %s was %s, expected %s""" %(key, kind, mkind))
- kind = mkind
-
- return Property(key, value, kind), added
-
-
- def addField(self, key, kind, overrides=None):
- """ Add a field to the model.
- key -- field name
- kind -- type by name (registered with registerPropertyType)
- kwargs -- overrides and additional values to the default
- arguments supplied by kind
- """
- if key in self.fields:
- raise KeyError("""Another source tried to add %s field to the model""" % key)
-
- impl = propertyByKind(kind)
- options = impl.defaults.copy()
- if overrides: options.update(overrides)
- if impl.xapian_sort_type:
- if 'type' not in options:
- options['type'] = impl.xapian_sort_type
-
- self.fields[key] = (key, kind, options)
- self.fieldnames.append(key)
- return self
-
- def addFields(self, *args):
- """ List of arguments to addField """
- for arg in args: self.addField(*arg)
- return self
-
- def apply(self, indexmanager):
- addField = indexmanager.addField
- for fn in self.fieldnames:
- args = self.fields[fn]
- addField(args[0], **args[2])
-
- def get_external_properties(self):
- external_properties = []
- for field_name in self.fields:
- field = self.fields.get(field_name)
- if field[1] == "external":
- external_properties.append(field[0])
- return external_properties
-
-# Properties we don't automatically include in properties dict
-EXCLUDED_PROPERTIES = ['fulltext', ]
-
-class Content(object):
- """A light weight proxy around Xapian Documents from secore.
- This provides additional methods which are used in the
- backingstore to assist in storage
- """
- __slots__ = ('_doc', '_backingstore', '_file', '_model')
-
- def __init__(self, xapdoc, backingstore=None, model=None):
- self._doc = xapdoc
- self._backingstore = backingstore
- self._file = None
- self._model = model
-
- def __repr__(self):
- return "<%s %s>" %(self.__class__.__name__,
- self.properties)
-
- def get_property(self, key, default=_marker):
- result = self._doc.data.get(key, default)
- if result is _marker: raise KeyError(key)
- if isinstance(result, list) and len(result) == 1:
- result = result[0]
- field = self._model.fields.get(key)
- if field[1] == "external":
- return self.get_external_property(key)
- else:
- kind = propertyByKind(field[1])
- # Errors here usually property request for a missing field
- return kind.from_xapian(result)
-
- def get_external_property(self, key):
- return self._backingstore.get_external_property(self.id, key)
-
- @property
- def properties(self):
- d = {}
- for k in self._model.fields:
- if k in EXCLUDED_PROPERTIES: continue
-
- field = self._model.fields.get(k)
- if field:
- if field[1] == "external":
- v = self.get_external_property(k)
- else:
- v = self.data.get(k, _marker)
- if v is _marker: continue
- if isinstance(v, list) and len(v) == 1:
- v = v[0]
- kind = propertyByKind(field[1])
- v = kind.from_xapian(v)
- else:
- # do some generic property handling
- if v: v = str(v)
- else: v = ''
- d[k] = v
- return d
-
- def _get_extension_from_mimetype(self):
- # try to get an extension from the mimetype if available
- mt = self.get_property('mime_type', None)
- if mt is not None:
- ext = mime.get_primary_extension(mt)
- # .ksh is a strange ext for plain text
- if ext and ext == '.ksh': ext = '.txt'
- if ext and ext == '.jpe': ext = '.jpg' # fixes #3163
- return ext
- return None
-
- def suggestName(self):
- # we look for certain known property names
- # - filename
- # - ext
- # and create a base file name that will be used for the
- # checkout name
- filename = self.get_property('filename', None)
- ext = self.get_property('ext', '')
- if not ext:
- ext = self._get_extension_from_mimetype()
-
- logging.debug('Content.suggestName: %r %r' % (filename, ext))
-
- if filename:
- # some backingstores keep the full relative path
- filename = os.path.split(filename)[1]
- f, e = os.path.splitext(filename)
- if e: return filename, None
- if ext: return "%s.%s" % (filename, ext), None
- elif ext:
- return None, ext
-
- return None, None
-
- def get_file(self):
- if not hasattr(self, "_file") or not self._file or \
- self._file.closed is True:
- target, ext = self.suggestName()
- targetfile = self.backingstore._targetFile(self.id, target, ext)
- self._file = targetfile
- return self._file
-
- def set_file(self, fileobj):
- self._file = fileobj
- file = property(get_file, set_file)
-
- @property
- def filename(self): return os.path.abspath(self.file.name)
-
- @property
- def contents(self): return self.file.read()
-
- @property
- def backingstore(self): return self._backingstore
-
- @property
- def id(self): return self._doc.id
-
- @property
- def data(self): return self._doc.data
-
-
-def noop(value): return value
-
-import re
-base64hack = re.compile("(\S{212})")
-def base64enc(value): return ' '.join(base64hack.split(value.encode('base64')))
-def base64dec(value): return value.replace(' ', '').decode('base64')
-
-DATEFORMAT = "%Y-%m-%dT%H:%M:%S"
-def date2string(value): return value.replace(microsecond=0).isoformat()
-def string2date(value): return timeparse(value, DATEFORMAT)
-
-def encode_datetime(value):
- # encode datetime to timestamp (float)
- # parse the typelib form to a datetime first
- if isinstance(value, basestring): value = string2date(value)
- return str(time.mktime(value.timetuple()))
-
-def decode_datetime(value):
- # convert a float to a local datetime
- return datetime.datetime.fromtimestamp(float(value)).isoformat()
-
-def datedec(value, dateformat=DATEFORMAT):
- return timeparse(value, DATEFORMAT)
-
-def dateenc(value, dateformat=DATEFORMAT):
- if isinstance(value, basestring):
- # XXX: there is an issue with microseconds not getting parsed
- value = timeparse(value, DATEFORMAT)
- value = value.replace(microsecond=0)
- return value.isoformat()
-
-
-
-# type, get, set, xapian sort type [string|float|date], defaults
-# defaults are the default options to addField in IndexManager
-# these can be overridden on model assignment
-registerPropertyType('string', noop, noop, 'string', {'store' : True,
- 'exact' : True,
- 'sortable' : True})
-
-registerPropertyType('text', noop, noop, 'string', {'store' : True,
- 'exact' : False,
- 'sortable' : False,
- 'collapse' : True,
- })
-
-registerPropertyType('binary', noop, noop, None, {'store' : True,
- 'exact' : False,
- 'fulltext': False,
- 'sortable' : False})
-
-registerPropertyType('int', str, int, 'float', {'store' : True,
- 'exact' : True,
- 'sortable' : True},
- for_xapian=str)
-
-registerPropertyType('number', str, float, 'float', {'store' : True,
- 'exact' : True,
- 'sortable' : True})
-
-registerPropertyType('date', dateenc, datedec, 'float', {'store' : True,
- 'exact' : True,
- 'sortable' : True
- },
- for_xapian=encode_datetime,
- from_xapain=decode_datetime)
-
-
-registerPropertyType('external', noop, noop, 'string', {'external' : True,
- 'store' : False,
- 'exact' : False,
- 'fulltext' : False,
- })
-
-defaultModel = Model().addFields(
- ('fulltext', 'text'),
- # vid is version id
- ('vid', 'number'),
- ('checksum', 'string'),
- ('filename', 'string'),
- ('ext', 'string'), # its possible we don't store a filename, but
- # only an extension we are interested in
- # Title has additional weight
- ('title', 'text', {'weight' : 2 }),
- ('url', 'string'),
- ('mime_type', 'string'),
- ('author', 'string'),
- ('language', 'string'),
- ('ctime', 'date'),
- ('mtime', 'date'),
- # Better store the timestamp instead of date strings
- ('timestamp', 'int'),
- # this will just be a space delimited list of tags
- # indexed with the content
- # I give them high weight as they have user given semantic value.
- ('tags', 'text', {'weight' :3 } ),
-
- # olpc specific
- ('activity', 'string'),
- ('activity_id', 'string'),
- ('title_set_by_user', 'text'),
- ('keep', 'int'),
- ('icon-color', 'string'),
- ('preview', 'external'),
- ('buddies', 'text'),
- ('source', 'text'),
- ('description', 'text'),
- )
-
diff --git a/src/olpc/datastore/optimizer.py b/src/olpc/datastore/optimizer.py
new file mode 100644
index 0000000..ed62e55
--- /dev/null
+++ b/src/olpc/datastore/optimizer.py
@@ -0,0 +1,153 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import errno
+import logging
+
+import gobject
+
+from olpc.datastore import layoutmanager
+
+class Optimizer(object):
+ """Optimizes disk space usage by detecting duplicates and sharing storage.
+ """
+ def __init__(self, file_store, metadata_store):
+ self._file_store = file_store
+ self._metadata_store = metadata_store
+ self._enqueue_checksum_id = None
+
+ def optimize(self, uid):
+ """Add an entry to a queue of entries to be checked for duplicates.
+
+ """
+ if not os.path.exists(self._file_store.get_file_path(uid)):
+ return
+
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ open(os.path.join(queue_path, uid), 'w').close()
+ logging.debug('optimize %r' % os.path.join(queue_path, uid))
+
+ if self._enqueue_checksum_id is None:
+ self._enqueue_checksum_id = \
+ gobject.idle_add(self._process_entry_cb,
+ priority=gobject.PRIORITY_LOW)
+
+ def remove(self, uid):
+ """Remove any structures left from space optimization
+
+ """
+ checksum = self._metadata_store.get_property(uid, 'checksum')
+ if checksum is None:
+ return
+
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('remove %r' % os.path.join(checksum_path, uid))
+ os.remove(os.path.join(checksum_path, uid))
+ try:
+ os.rmdir(checksum_path)
+ logging.debug('removed %r' % checksum_path)
+ except OSError, e:
+ if e.errno != errno.ENOTEMPTY:
+ raise
+
+ def _identical_file_already_exists(self, checksum):
+ """Check if we already have files with this checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(checksum_path)
+
+ def _get_uid_from_checksum(self, checksum):
+ """Get an existing entry which file matches checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ first_uid = os.listdir(checksum_path)[0]
+ return first_uid
+
+ def _create_checksum_dir(self, checksum):
+ """Create directory that tracks files with this same checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ logging.debug('create dir %r' % checksum_path)
+ os.mkdir(checksum_path)
+
+ def _add_checksum_entry(self, uid, checksum):
+ """Create a file in the checksum dir with the uid of the entry
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('touch %r' % os.path.join(checksum_path, uid))
+ open(os.path.join(checksum_path, uid), 'w').close()
+
+ def _already_linked(self, uid, checksum):
+ """Check if this entry's file is already a hard link to the checksums
+ dir.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(os.path.join(checksum_path, uid))
+
+ def _process_entry_cb(self):
+ """Process one item in the checksums queue by calculating its checksum,
+ checking if there exist already an identical file, and in that case
+ substituting its file with a hard link to that pre-existing file.
+
+ """
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ queue = os.listdir(queue_path)
+ if queue:
+ uid = queue[0]
+ logging.debug('_process_entry_cb processing %r' % uid)
+ file_in_entry_path = self._file_store.get_file_path(uid)
+ checksum = self._calculate_md5sum(file_in_entry_path)
+ self._metadata_store.set_property(uid, 'checksum', checksum)
+
+ if self._identical_file_already_exists(checksum):
+ if not self._already_linked(uid, checksum):
+ existing_entry_uid = self._get_uid_from_checksum(checksum)
+ self._file_store.hard_link_entry(uid, existing_entry_uid)
+
+ self._add_checksum_entry(uid, checksum)
+ else:
+ self._create_checksum_dir(checksum)
+ self._add_checksum_entry(uid, checksum)
+
+ os.remove(os.path.join(queue_path, uid))
+
+ if len(queue) <= 1:
+ self._enqueue_checksum_id = None
+ return False
+ else:
+ return True
+
+ def _calculate_md5sum(self, path):
+ """Calculate the md5 checksum of a given file.
+
+ """
+ in_, out = os.popen2(['md5sum', path])
+ return out.read().split(' ', 1)[0]
+
diff --git a/src/olpc/datastore/utils.py b/src/olpc/datastore/utils.py
deleted file mode 100644
index 0505463..0000000
--- a/src/olpc/datastore/utils.py
+++ /dev/null
@@ -1,162 +0,0 @@
-import datetime
-import dbus
-import re
-import time
-
-
-class Singleton(type):
- """A singleton metaclass
-
- >>> class MyManager(object):
- ... __metaclass__ = Singleton
- >>> a = MyManager()
- >>> b = MyManager()
- >>> assert a is b
-
- """
- def __init__(cls,name,bases,dic):
- super(Singleton,cls).__init__(name,bases,dic)
- cls._instance=None
- def __call__(cls,*args,**kw):
- if cls._instance is None:
- cls._instance=super(Singleton,cls).__call__(*args,**kw)
- return cls._instance
-
-class partial:
- def __init__(self, fun, *args, **kwargs):
- self.fun = fun
- self.pending = args
- self.kwargs = kwargs
-
- def __call__(self, *args, **kwargs):
- if kwargs and self.kwargs:
- kw = self.kwargs.copy()
- kw.update(kwargs)
- else:
- kw = kwargs or self.kwargs
-
- return self.fun(*(self.pending + args), **kw)
-
-def once(method):
- "A decorator that runs a method only once."
- attrname = "_called"
- def decorated(self, *args, **kwargs):
- try:
- return getattr(method, attrname)
- except AttributeError:
- r = method(self, *args, **kwargs)
- setattr(method, attrname, r)
- return r
- return decorated
-
-
-
-def create_uid():
- # this is linux specific but easily changed
- # Python 2.5 has universal support for this built in
- return open('/proc/sys/kernel/random/uuid', 'r').read()[:-1]
-
-
-def options_for(dict, prefix, invert=False):
- """return a dict of the filtered properties for keys with prefix.
- prefix will be removed
-
- If invert is True then only those keys not matching prefix are returned.
-
- >>> assert options_for({'app.a.option' : 1, 'app.b.option' : 2}, 'app.b.')['option'] == 2
- """
- d = {}
- l = len(prefix)
- for k, v in dict.iteritems():
- if k.startswith(prefix):
- if invert is False:d[k[l:]] = v
- elif invert is True:
- d[k] = v
-
- return d
-
-
-
-def _convert(arg):
- # this recursively processes arguments sent over dbus and yields
- # normalized versions
- if isinstance(arg, (dbus.String, dbus.UTF8String)):
- try: return arg.encode('utf-8')
- except: return str(arg)
-
- if isinstance(arg, (dbus.Dictionary, dict)):
- d = {}
- for k, v in arg.iteritems():
- # here we call str on the lhs making it suitable for
- # passing as keywords args
- d[str(_convert(k))] = _convert(v)
- return d
-
- if isinstance(arg, dbus.Array):
- a = []
- for item in arg:
- a.append(_convert(item))
- return a
- return arg
-
-
-def sanitize_dbus(method):
- # decorator to produce an alternative version of arguments based on pure Python
- # types.
- def decorator(self, *args, **kwargs):
- n = []
- for arg in args: n.append(_convert(arg))
- kw = _convert(kwargs)
- return method(self, *n, **kw)
- return decorator
-
-DATEFORMAT = "%Y-%m-%dT%H:%M:%S"
-def timeparse(t, format=DATEFORMAT):
- """Parse a time string that might contain fractions of a second.
-
- Fractional seconds are supported using a fragile, miserable hack.
- Given a time string like '02:03:04.234234' and a format string of
- '%H:%M:%S', time.strptime() will raise a ValueError with this
- message: 'unconverted data remains: .234234'. If %S is in the
- format string and the ValueError matches as above, a datetime
- object will be created from the part that matches and the
- microseconds in the time string.
- """
- try:
- return datetime.datetime(*time.strptime(t, format)[0:6])
- except ValueError, msg:
- if "%S" in format:
- msg = str(msg)
- mat = re.match(r"unconverted data remains:"
- " \.([0-9]{1,6})$", msg)
- if mat is not None:
- # fractional seconds are present - this is the style
- # used by datetime's isoformat() method
- frac = "." + mat.group(1)
- t = t[:-len(frac)]
- t = datetime.datetime(*time.strptime(t, format)[0:6])
- microsecond = int(float(frac)*1e6)
- return t.replace(microsecond=microsecond)
- else:
- mat = re.match(r"unconverted data remains:"
- " \,([0-9]{3,3})$", msg)
- if mat is not None:
- # fractional seconds are present - this is the style
- # used by the logging module
- frac = "." + mat.group(1)
- t = t[:-len(frac)]
- t = datetime.datetime(*time.strptime(t, format)[0:6])
- microsecond = int(float(frac)*1e6)
- return t.replace(microsecond=microsecond)
-
- raise
-
-
-def parse_timestamp_or_float(value):
- result = None
- try:
- result = timeparse(value)
- result = str(time.mktime(result.timetuple()))
- except:
- result = str(float(value))
- return result
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
deleted file mode 100644
index e8a5ee5..0000000
--- a/src/olpc/datastore/xapianindex.py
+++ /dev/null
@@ -1,575 +0,0 @@
-"""
-xapianindex
-~~~~~~~~~~~~~~~~~~~~
-maintain indexes on content
-
-"""
-from __future__ import with_statement
-
-__author__ = 'Benjamin Saller <bcsaller@objectrealms.net>'
-__docformat__ = 'restructuredtext'
-__copyright__ = 'Copyright ObjectRealms, LLC, 2007'
-__license__ = 'The GNU Public License V2+'
-
-
-
-from Queue import Queue, Empty
-import gc
-import logging
-import re
-import sys
-import time
-import thread
-import threading
-import warnings
-import traceback
-
-import secore
-import xapian as _xapian # we need to modify the QueryParser
-import gobject
-
-from olpc.datastore import model
-from olpc.datastore.converter import converter
-from olpc.datastore.utils import create_uid, parse_timestamp_or_float
-
-
-# Setup Logger
-logger = logging.getLogger('org.sugar.datastore.xapianindex')
-
-# Indexer Operations
-CREATE = 1
-UPDATE = 2
-DELETE = 3
-
-# Force a flush every _n_ changes to the db
-FLUSH_THRESHOLD = 20
-
-# Force a flush after _n_ seconds since the last change to the db
-FLUSH_TIMEOUT = 60
-
-class ReadWriteConnection(secore.indexerconnection.IndexerConnection,
- secore.searchconnection.SearchConnection):
- # has search methods on a write connection
- pass
-
-class ContentMappingIter(object):
- """An iterator over a set of results from a search.
-
- """
- def __init__(self, results, backingstore, model):
- self._results = results
- self._backingstore = backingstore
- self._iter = iter(results)
- self._model = model
-
- def __iter__(self): return self
-
- def next(self):
- searchresult = self._iter.next()
- return model.Content(searchresult, self._backingstore, self._model)
-
-
-class IndexManager(object):
- DEFAULT_DATABASE_NAME = 'index'
-
- def __init__(self, default_language='en'):
- # We will maintain two connections to the database
- # we trigger automatic flushes to the read_index
- # after any write operation
- self.write_index = None
- self.read_index = None
- self.queue = Queue(0)
- self.indexer_running = False
- self.language = default_language
-
- self.backingstore = None
-
- self.fields = set()
- self._write_lock = threading.Lock()
-
- self.deltact = 0 # delta count
- self._flush_timeout = None
-
- #
- # Initialization
- def connect(self, repo, **kwargs):
- logging.debug('IndexManager.connect()')
- if self.write_index is not None:
- warnings.warn('''Requested redundant connect to index''',
- RuntimeWarning)
-
- self.repo = repo
- self.write_index = ReadWriteConnection(repo)
-
- # configure the database according to the model
- datamodel = kwargs.get('model', model.defaultModel)
- datamodel.apply(self)
-
- # store a reference
- self.datamodel = datamodel
-
- self.read_index = self.write_index
-
- self.flush(force=True)
- # by default we start the indexer now
- self.startIndexer()
- assert self.indexer.isAlive()
-
-
- def bind_to(self, backingstore):
- # signal from backingstore that its our parent
- self.backingstore = backingstore
-
-
- def stop(self, force=False):
- logging.debug('IndexManager.stop()')
- self.flush(force=True)
- self.stopIndexer(force)
-
- # TODO: Would be better to check if the device is present and
- # don't try to close if it's not.
- try:
- self.write_index.close()
- except _xapian.DatabaseError, e:
- logging.debug('Index close failed:\n' + \
- ''.join(traceback.format_exception(*sys.exc_info())))
-
- #self.read_index.close()
- # XXX: work around for xapian not having close() this will
- # change in the future in the meantime we delete the
- # references to the indexers and then force the gc() to run
- # which should inturn trigger the C++ destructor which forces
- # the database shut.
- self.write_index = None
- self.read_index = None
- gc.collect()
-
- # Index thread management
- def startIndexer(self):
- self.indexer_running = True
- self.indexer = threading.Thread(target=self.indexThread)
- self.indexer.setDaemon(True)
- self.indexer.start()
-
- def stopIndexer(self, force=False):
- if not self.indexer_running: return
- if not force: self.queue.join()
- self.indexer_running = False
- # should terminate after the current task
- self.indexer.join()
-
- def _flush_timeout_cb(self):
- self.flush(True)
- return False
-
- # flow control
- def flush(self, force=False):
- logging.debug('IndexManager.flush: %r %r' % (force, self.deltact))
- """Called after any database mutation"""
-
- if self._flush_timeout is not None:
- gobject.source_remove(self._flush_timeout)
- self._flush_timeout = None
-
- self.deltact += 1
- if force or self.deltact > FLUSH_THRESHOLD:
- with self._write_lock:
-
- # TODO: Would be better to check if the device is present and
- # don't try to flush if it's not.
- try:
- self.write_index.flush()
- except _xapian.DatabaseError, e:
- logging.debug('Index flush failed:\n' + \
- ''.join(traceback.format_exception(*sys.exc_info())))
-
- #self.read_index.reopen()
- self.deltact = 0
- else:
- self._flush_timeout = gobject.timeout_add(FLUSH_TIMEOUT * 1000,
- self._flush_timeout_cb)
-
- def enque(self, uid, vid, doc, operation, filestuff=None):
- # here we implement the sync/async policy
- # we want to take create/update operations and
- # set theproperties right away, the
- # conversion/fulltext indexing can
- # happen in the thread
- if operation in (CREATE, UPDATE):
- with self._write_lock:
- if operation is CREATE:
- self.write_index.add(doc)
- logger.info("created %s:%s" % (uid, vid))
- elif operation is UPDATE:
- self.write_index.replace(doc)
- logger.info("updated %s:%s" % (uid, vid))
- self.flush()
-
- # Disable content indexing for Trial-3.
- # https://dev.laptop.org/ticket/3058
- return
-
- # now change CREATE to UPDATE as we set the
- # properties already
- operation = UPDATE
- if not filestuff:
- # In this case we are done
- return
- elif operation is DELETE:
- # sync deletes
- with self._write_lock:
- self.write_index.delete(uid)
- logger.info("deleted content %s:%s" % (uid,vid))
- self.flush()
- return
-
- self.queue.put((uid, vid, doc, operation, filestuff))
-
- def indexThread(self):
- # process the queue
- # XXX: there is currently no way to remove items from the queue
- # for example if a USB stick is added and quickly removed
- # the mount should however get a stop() call which would
- # request that the indexing finish
- # XXX: we can in many cases index, not from the tempfile but
- # from the item in the repo as that will become our immutable
- # copy. Detect those cases and use the internal filename
- # property or backingstore._translatePath to get at it
- while self.indexer_running:
- # include timeout here to ease shutdown of the thread
- # if this is a non-issue we can simply allow it to block
- try:
- data = self.queue.get(True, 0.025)
- uid, vid, doc, operation, filestuff = data
- except Empty:
- #time.sleep(1.0)
- continue
-
- try:
- with self._write_lock:
- if operation is UPDATE:
- # Here we handle the conversion of binary
- # documents to plain text for indexing. This is
- # done in the thread to keep things async and
- # latency lower.
- # we know that there is filestuff or it
- # wouldn't have been queued
- filename, mimetype = filestuff
- if isinstance(filename, file):
- filename = filename.name
- fp = converter(filename, mimetype)
- if fp:
- # read in at a fixed block size, try to
- # conserve memory. If this doesn't work
- # we can make doc.fields a generator
- while True:
- chunk = fp.read(2048)
- if not chunk: break
- doc.fields.append(secore.Field('fulltext', chunk))
-
- self.write_index.replace(doc)
- logger.info("update file content %s:%s" % (uid, vid))
- else:
- logger.warning("""Conversion process failed for document %s %s""" % (uid, filename))
- else:
- logger.warning("Unknown indexer operation ( %s: %s)" % (uid, operation))
-
- # tell the queue its complete
- self.queue.task_done()
-
- # we do flush on each record now
- self.flush()
- except:
- logger.exception("Error in indexer")
-
-
- def complete_indexing(self):
- """Intentionally block until the indexing is complete. Used
- primarily in testing.
- """
- self.queue.join()
- self.flush()
-
- #
- # Field management
- def addField(self, key, store=True, exact=False, sortable=False, fulltext=True,
- type='string', collapse=False,
- **kwargs):
- language = kwargs.pop('language', self.language)
- external = kwargs.pop('external', False)
-
- xi = self.write_index.add_field_action
-
- try:
- if store: xi(key, secore.FieldActions.STORE_CONTENT)
- if exact: xi(key, secore.FieldActions.INDEX_EXACT)
- elif fulltext:
- # weight -- int 1 or more
- # nopos -- don't include positional information
- # noprefix -- boolean
- xi(key, secore.FieldActions.INDEX_FREETEXT, language=language, **kwargs)
-
- if sortable:
- xi(key, secore.FieldActions.SORTABLE, type=type)
- if collapse:
- xi(key, secore.FieldActions.COLLAPSE)
- except secore.IndexerError, e:
- logging.warning('Could not add field %r: %s' % (key, e))
-
- # track this to find missing field configurations
- self.fields.add(key)
-
- #
- # Index Functions
- def _mapProperties(self, props):
- """data normalization function, maps dicts of key:kind->value
- to Property objects
- """
- d = {}
- add_anything = False
- for k,v in props.iteritems():
- p, added = self.datamodel.fromstring(k, v,
- allowAddition=True)
- if added is True:
- self.fields.add(p.key)
- add_anything = True
- d[p.key] = p
-
- if add_anything:
- with self._write_lock:
- self.datamodel.apply(self)
-
- return d
-
- def index(self, props, filename=None):
- """Index the content of an object.
- Props must contain the following:
- key -> Property()
- """
- operation = UPDATE
- #
- # Version handling
- #
- # we implicitly create new versions of documents the version
- # id should have been set by the higher level system
- uid = props.pop('uid', None)
- vid = props.pop('vid', None)
-
- if not uid:
- uid = create_uid()
- operation = CREATE
-
- if vid: vid = str(float(vid) + 1.0)
- else: vid = "1.0"
-
- # Property mapping via model
- props = self._mapProperties(props)
- doc = secore.UnprocessedDocument()
- add = doc.fields.append
- fp = None
-
-
- filestuff = None
- if filename:
- # enque async file processing
- # XXX: to make sure the file is kept around we could keep
- # and open fp?
- mimetype = props.get("mime_type")
- mimetype = mimetype and mimetype.value or 'text/plain'
- filestuff = (filename, mimetype)
-
- doc.id = uid
- add(secore.Field('vid', vid))
-
- #
- # Property indexing
- for k, prop in props.iteritems():
- value = prop.for_xapian
-
- if k not in self.fields:
- warnings.warn("""Missing field configuration for %s""" % k,
- RuntimeWarning)
- continue
-
- # XXX: wrap this in a proper API
- if self.datamodel.fields[k][1] == "external":
- # XXX: this is done directly inline and could block,
- # but the expected datasize is still small
- self.backingstore.set_external_property(uid, k, value)
- else:
- add(secore.Field(k, value))
-
- # queue the document for processing
- self.enque(uid, vid, doc, operation, filestuff)
-
- return uid
-
- def get(self, uid):
- doc = self.read_index.get_document(uid)
- if not doc: raise KeyError(uid)
- return model.Content(doc, self.backingstore, self.datamodel)
-
- def delete(self, uid):
- # does this need queuing?
- # the higher level abstractions have to handle interaction
- # with versioning policy and so on
- self.enque(uid, None, None, DELETE)
-
- #
- # Search
- def search(self, query, start_index=0, end_index=4096, order_by=None):
- """search the xapian store.
- query is a string defining the serach in standard web search syntax.
-
- ie: it contains a set of search terms. Each search term may be
- preceded by a "+" sign to indicate that the term is required, or a "-"
- to indicate that is is required to be absent.
- """
- ri = self.read_index
- if not query:
- q = self.read_index.query_all()
- elif isinstance(query, dict):
- queries = []
- q = query.pop('query', None)
- if q:
- queries.append(self.parse_query(q))
- if not query and not queries:
- # we emptied it
- q = self.read_index.query_all()
- else:
- # each term becomes part of the query join
- for k, v in query.iteritems():
- if isinstance(v, dict):
- # it might be a range scan
- # this needs to be factored out
- # and/or we need client side lib that helps
- # issue queries because there are type
- # conversion issues here
- start = v.pop('start', 0)
- end = v.pop('end', sys.maxint)
- start = parse_timestamp_or_float(start)
- end = parse_timestamp_or_float(end)
- queries.append(ri.query_range(k, start, end))
- elif isinstance(v, list):
- # construct a set of OR queries
- ors = []
- for item in v: ors.append(ri.query_field(k, item))
- queries.append(ri.query_composite(ri.OP_OR, ors))
- else:
- queries.append(ri.query_field(k, v))
-
- q = ri.query_composite(ri.OP_AND, queries)
- else:
- q = self.parse_query(query)
-
- if isinstance(order_by, (list, tuple)):
- # secore only handles a single item, not a multilayer sort
- if order_by: order_by = order_by[0]
- else: order_by = None
-
- results = ri.search(q, start_index, end_index, sortby=order_by,
- checkatleast=sys.maxint)
- count = results.matches_estimated
-
- # map the result set to model.Content items
- return ContentMappingIter(results, self.backingstore, self.datamodel), count
-
- def get_all_ids(self):
- return [ti.term[1:] for ti in self.read_index._index.allterms('Q')]
-
- def get_uniquevaluesfor(self, property):
- # XXX: this is very sketchy code
- # try to get the searchconnection to support this directly
- # this should only apply to EXACT fields
- r = set()
- prefix = self.read_index._field_mappings.get_prefix(property)
- plen = len(prefix)
- termiter = self.read_index._index.allterms(prefix)
- for t in termiter:
- term = t.term
- if len(term) > plen:
- term = term[plen:]
- if term.startswith(':'): term = term[1:]
- r.add(term)
-
- # r holds the textual representation of the fields value set
- # if the type of field or property needs conversion to a
- # different python type this has to happen now
- descriptor = self.datamodel.fields.get(property)
- if descriptor:
- kind = descriptor[1]
- impl = model.propertyByKind(kind)
- r = set([impl.set(i) for i in r])
-
- return r
-
- def parse_query(self, query):
- # accept standard web query like syntax
- # 'this' -- match this
- # 'this that' -- match this and that in document
- # '"this that"' match the exact pharse 'this that'
- # 'title:foo' match a document whose title contains 'foo'
- # 'title:"A tale of two datastores"' exact title match
- # '-this that' match that w/o this
-
- # limited support for wildcard searches
-
- qp = _xapian.QueryParser
-
- flags = (qp.FLAG_LOVEHATE)
-
- ri = self.read_index
- start = 0
- end = len(query)
- nextword = re.compile("(\S+)")
- endquote = re.compile('(")')
- queries = []
- while start < end:
- m = nextword.match(query, start)
- if not m: break
- orig = start
- field = None
- start = m.end() + 1
- word = m.group(1)
- if ':' in word:
- # see if its a field match
- fieldname, w = word.split(':', 1)
- if fieldname in self.fields:
- field = fieldname
-
- word = w
-
- if word.startswith('"'):
- qm = endquote.search(query, start)
- if qm:
- #XXX: strip quotes or not here
- #word = query[orig+1:qm.end(1)-1]
- word = query[orig:qm.end(1)]
- # this is a phrase modify the flags
- flags |= qp.FLAG_PHRASE
- start = qm.end(1) + 1
-
- if field:
- queries.append(ri.query_field(field, word))
- else:
- if word.endswith("*"):
- flags |= qp.FLAG_WILDCARD
- q = self._query_parse(word, flags)
-
- queries.append(q)
- q = ri.query_composite(ri.OP_AND, queries)
- return q
-
- def _query_parse(self, word, flags=0, op=None):
- # while newer secore do pass flags it doesn't allow control
- # over them at the API level. We override here to support
- # adding wildcard searching
- ri = self.read_index
- if op is None: op = ri.OP_AND
- qp = ri._prepare_queryparser(None, None, op)
- try:
- return qp.parse_query(word, flags)
- except _xapian.QueryParserError, e:
- # If we got a parse error, retry without boolean operators (since
- # these are the usual cause of the parse error).
- return qp.parse_query(string, 0)
diff --git a/tests/test_perf.py b/tests/test_perf.py
new file mode 100644
index 0000000..d1e0269
--- /dev/null
+++ b/tests/test_perf.py
@@ -0,0 +1,109 @@
+# Copyright (C) 2007 One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import sys
+import os
+import unittest
+import time
+import tempfile
+import shutil
+from datetime import datetime
+
+import dbus
+
+DS_DBUS_SERVICE = "org.laptop.sugar.DataStore"
+DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore"
+DS_DBUS_PATH = "/org/laptop/sugar/DataStore"
+
+PROPS_WITHOUT_PREVIEW = {'activity_id': '37fa2f4013b17ae7fc6448f10fe5df53ef92de18',
+ 'title_set_by_user': '0',
+ 'title': 'Write Activity',
+ 'timestamp': int(time.time()),
+ 'activity': 'org.laptop.AbiWordActivity',
+ 'share-scope': 'private',
+ 'keep': 0,
+ 'icon-color': '#00588C,#00EA11',
+ 'mtime': datetime.now().isoformat(),
+ 'preview': '',
+ 'mime_type': ''}
+
+PROPS_WITH_PREVIEW = {'activity_id': 'e8594bea74faa80539d93ef1a10de3c712bb2eac',
+ 'title_set_by_user': '0',
+ 'title': 'Write Activity',
+ 'share-scope': 'private',
+ 'timestamp': int(time.time()),
+ 'activity': 'org.laptop.AbiWordActivity',
+ 'fulltext': 'mec mac',
+ 'keep': 0,
+ 'icon-color': '#00588C,#00EA11',
+ 'mtime': datetime.now().isoformat(),
+ 'preview': dbus.ByteArray('\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xd8\x00\x00\x00\xa2\x08\x02\x00\x00\x00\xac\xfb\x94\x1d\x00\x00\x00\x03sBIT\x08\x08\x08\xdb\xe1O\xe0\x00\x00\x03\x1dIDATx\x9c\xed\xd6\xbfJci\x00\xc6\xe1\xc4\xb5Qb\xb0Qa\xfc\xb3\x0cV\xda\xc8,\xa4\xf0^,\xbc;\xef@\x04kA\xb3(\x16b\x97 3("\xc9\x14\'\x13\xc5\x9c\x9cq\x8bmg\xdcj\x93\x17\xf3<\xedw\x8a\xf7\x83\x1f\xe7\x9c\xfa\xd1\xd1\xd1\xc6\xc6F\r\xa6\xe4\xf1\xf1\xf1\xfe\xfe~~{{\xbb\xd5jM{\x0c\xb3\xeb\xe1\xe1\xa1\xddn\xcf\xbf\xf3D\xaf\xd7\xbb\xbd\xbdm4\x1aooo\xbb\xbb\xbb\x97\x97\x97;;;\xa3\xd1\xa8\xd3\xe9\xb4Z\xad\xaa\xaa...\x96\x96\x96\xca\xb2\xdc\xdf\xdf???_[[[^^\xbe\xb9\xb9\xd9\xdb\xdbk6\x9b\x13\xbb\t\x1f\xc0{!\xd6\xeb\xf5n\xb7\xfb\xf4\xf4\xb4\xb0\xb0p}}\xbd\xbe\xbe~rrR\x14\xc5\xc1\xc1AY\x96\x8dF\xe3\xea\xea\xaa\xaa\xaa\xc5\xc5\xc5V\xab\xd5\xe9tNOOWVV\x0e\x0f\x0f\x87\xc3\xe1\xc4.\xc0\xc70\xf7\xce\xd9`0(\xcb\xf2\xc7`\xf0\xbd\xdf\xffsk\xebgU\xfd\xf5\xe5K\xb3\xd9\xfc\xbb\xdd\xfecn\xee\xf9\xf9\xf9\xf5\xf5\xb5(\x8a\xe1pxww\xd7\xef\xf7\x8b\xa2X]]=;;\x9b\xd8z>\x8cz\xbb\xdd\xfe\xed?\xe2\xb7o\xb5\xaf_\x7f}\xf4\xe9S\xed\xf3\xe7\xffo\x16\xb3\xe3\xbf\xff\x11k\x9b\x9b\xb5\xcd\xcdI\xeda\xa6\xbd\xf7i\x86\x89\x11"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!\x12A\x88D\x10"\x11\x84H\x04!2e\xe3\xf1x<\x1e\xcfO{\x06\xb3\xee\xf8\xf8\xb8\xd7\xeby#2e///UU\t\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\x08\x91\x08B$\x82\x10\x89 D"\xccw\xbb\xdd\xd1h4\xed\x19\xcc\xae\x7f\xf3\xfb\x07q8\x9emk8\x97\xda\x00\x00\x00\x00IEND\xaeB`\x82'),
+ 'mime_type': 'application/vnd.oasis.opendocument.text'}
+
+def prepare_file():
+ file_path = os.path.join(os.getcwd(), 'tests/funkyabi.odt')
+ f, tmp_path = tempfile.mkstemp()
+ os.close(f)
+ shutil.copyfile(file_path, tmp_path)
+ return tmp_path
+
+bus = dbus.SessionBus()
+proxy = bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH)
+data_store = dbus.Interface(proxy, DS_DBUS_INTERFACE)
+
+uids = []
+
+n = 100
+
+total = 0
+print 'Creating %r entries' % n
+for i in range(n):
+ file_path = prepare_file()
+ t = time.time()
+ uids.append(data_store.create(PROPS_WITHOUT_PREVIEW, file_path, True))
+ total += time.time() - t
+print 'Created %r entries in %.2f ms. avg' % (n, float(total * 1000) / n)
+
+total = 0
+print 'Updating %r entries' % len(uids)
+for uid in uids:
+ file_path = prepare_file()
+ t = time.time()
+ data_store.update(uid, PROPS_WITH_PREVIEW, file_path, True)
+ total += time.time() - t
+print 'Updated %r entries in %.2f ms. avg' % (n, float(total * 1000) / len(uids))
+
+total = 0
+print 'Retrieving %r full entries' % len(uids)
+for uid in uids:
+ t = time.time()
+ props = data_store.get_properties(uid)
+ total += time.time() - t
+print 'Retrieved %r full entries in %.2f ms. avg' % (n, float(total * 1000) / len(uids))
+
+total = 0
+query = {'order_by': ['-mtime'], 'limit': 80}
+properties = ['uid', 'title', 'mtime', 'timestamp', 'keep', 'buddies',
+ 'icon-color', 'mime_type', 'progress', 'activity', 'mountpoint',
+ 'activity_id']
+results, count = data_store.find(query, properties)
+print 'Searching %r times on a journal with %d entries' % (n, count)
+for i in range(n):
+ t = time.time()
+ results, count = data_store.find(query, properties)
+ total += time.time() - t
+print 'Searched %r full entries in %.2f ms. avg' % (n, float(total * 1000) / n)
+
diff --git a/tests/test_sugar.py b/tests/test_sugar.py
index 39a0e8c..1dbb607 100644
--- a/tests/test_sugar.py
+++ b/tests/test_sugar.py
@@ -34,9 +34,9 @@ DS_DBUS_PATH = "/org/laptop/sugar/DataStore"
PROPS_WITHOUT_PREVIEW = {'activity_id': '37fa2f4013b17ae7fc6448f10fe5df53ef92de18',
'title_set_by_user': '0',
'title': 'Write Activity',
- 'timestamp': int(time.time()),
+ 'timestamp': str(int(time.time())),
'activity': 'org.laptop.AbiWordActivity',
- 'share-scope': 'private',
+ 'share-scope': 'private\nmoc',
'keep': '0',
'icon-color': '#00588C,#00EA11',
'mtime': datetime.now().isoformat(),
@@ -47,7 +47,7 @@ PROPS_WITH_PREVIEW = {'activity_id': 'e8594bea74faa80539d93ef1a10de3c712bb2eac',
'title_set_by_user': '0',
'title': 'Write Activity',
'share-scope': 'private',
- 'timestamp': int(time.time()),
+ 'timestamp': str(int(time.time())),
'activity': 'org.laptop.AbiWordActivity',
'fulltext': 'mec mac',
'keep': '0',
@@ -82,7 +82,7 @@ class CommonTest(unittest.TestCase):
query = {'order_by': ['-mtime'],
'limit': 80}
t = time.time()
- results, count = self._data_store.find(query, [])
+ results, count = self._data_store.find(query, ['uid', 'title'])
t = time.time() - t
return t
@@ -106,12 +106,16 @@ class FunctionalityTest(CommonTest):
def testresume(self):
t, uid = self.create()
props = self._data_store.get_properties(uid, byte_arrays=True)
- #del props['uid']
+ del props['uid']
+ del props['mountpoint']
+ del props['checksum']
assert props == PROPS_WITHOUT_PREVIEW
t = self.update(uid)
props = self._data_store.get_properties(uid, byte_arrays=True)
- #del props['uid']
+ del props['uid']
+ del props['mountpoint']
+ del props['checksum']
assert props == PROPS_WITH_PREVIEW
file_name = self._data_store.get_filename(uid)
@@ -120,6 +124,12 @@ class FunctionalityTest(CommonTest):
f = open(file_name, 'r')
f.close()
+ results, count = self._data_store.find({'uid': uid}, ['uid', 'title'],
+ byte_arrays=True)
+ assert count == 1
+ assert results[0]['uid'] == uid
+ assert results[0]['title'] == 'Write Activity'
+
"""
def testcustomproperties(self):
t, uid = self.create()
@@ -130,8 +140,7 @@ class FunctionalityTest(CommonTest):
props = self._data_store.get_properties(uid)
assert props['custom_property'] == 'test'
- results, count = self._data_store.find({'custom_property': 'test'}, [])
- assert count > 1
+ results, count = self._data_store.find({'custom_property': 'test'}, ['custom_property'])
for entry in results:
assert entry['custom_property'] == 'test'
uid = entry['uid']
@@ -140,7 +149,10 @@ class FunctionalityTest(CommonTest):
"""
def testfind(self):
- t = self.find()
+ results, count = self._data_store.find({}, ['uid'])
+ assert count > 0
+
+ print self.find()
class PerformanceTest(CommonTest):
@@ -176,7 +188,7 @@ class PerformanceTest(CommonTest):
if __name__ == '__main__':
suite = unittest.TestSuite()
- #suite.addTest(unittest.makeSuite(FunctionalityTest))
- suite.addTest(unittest.makeSuite(PerformanceTest))
+ suite.addTest(unittest.makeSuite(FunctionalityTest))
+ #suite.addTest(unittest.makeSuite(PerformanceTest))
unittest.TextTestRunner().run(suite)