Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-07-12 23:59:50 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-07-12 23:59:50 (GMT)
commitb58ff0c10ce8246d8d62a09d0aa6622c059e4d28 (patch)
tree0445b3752398e7b3b23b791203a9e22e987dc34c
parent7aae48766ae46bd530a3c556cd4e92a0e02f7ad3 (diff)
property/type sync up
better join code on indexer
-rw-r--r--src/olpc/datastore/backingstore.py21
-rw-r--r--src/olpc/datastore/datastore.py13
-rw-r--r--src/olpc/datastore/model.py172
-rw-r--r--src/olpc/datastore/xapianindex.py68
-rw-r--r--tests/milestone_1.txt2
-rw-r--r--tests/milestone_2.txt16
-rw-r--r--tests/properties.txt9
-rw-r--r--tests/runalltests.py37
-rw-r--r--tests/test_model.py7
-rw-r--r--tests/testutils.py17
10 files changed, 181 insertions, 181 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 8ed1011..d7ea1bc 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -377,7 +377,9 @@ class FileBackingStore(BackingStore):
def stop(self):
self.indexmanager.stop()
-
+
+ def complete_indexing(self):
+ self.indexmanager.complete_indexing()
class InplaceFileBackingStore(FileBackingStore):
"""Like the normal FileBackingStore this Backingstore manages the
@@ -429,6 +431,7 @@ class InplaceFileBackingStore(FileBackingStore):
for fn in filenames:
source = os.path.join(dirpath, fn)
relative = source[len(self.uri)+1:]
+
result, count = self.indexmanager.search(dict(filename=relative))
if not count:
# create a new record
@@ -436,13 +439,14 @@ class InplaceFileBackingStore(FileBackingStore):
else:
# update the object with the new content iif the
# checksum is different
- # XXX: what if there is more than one? (shouldn't happen)
- content = result[0]
- uid = content
+ # XXX: what if there is more than one? (shouldn't
+ # happen)
+ content = result.next()
+ uid = content.id
# only if the checksum is different
- checksum = self._checksum(source)
- if checksum != content.checksum:
- self.update(uid, dict(filename=relative), source)
+ #checksum = self._checksum(source)
+ #if checksum != content.checksum:
+ self.update(uid, dict(filename=relative), source)
@@ -460,7 +464,8 @@ class InplaceFileBackingStore(FileBackingStore):
def update(self, uid, props, filelike=None):
# the file would have already been changed inplace
# don't touch it
- self.indexmanager.index(uid, props, filelike)
+ props['uid'] = uid
+ self.indexmanager.index(props, filelike)
def delete(self, uid):
c = self.indexmanager.get(uid)
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index da8ab74..2429637 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -155,7 +155,7 @@ class DataStore(dbus.service.Object):
def _resolveMountpoint(self, mountpoint=None):
if isinstance(mountpoint, dict):
- mountpoint = mountpoint.get('mountpoint')
+ mountpoint = mountpoint.pop('mountpoint', None)
if mountpoint is not None:
# this should be the id of a mount point
@@ -361,7 +361,7 @@ class DataStore(dbus.service.Object):
mountpoints = query.pop('mountpoints', self.mountpoints)
mountpoints = [self.mountpoints[str(m)] for m in mountpoints]
results = set()
-
+
for mp in mountpoints:
result = mp.get_uniquevaluesfor(propertyname)
results = results.union(result)
@@ -394,8 +394,8 @@ class DataStore(dbus.service.Object):
content = self.get(uid)
if content:
content.backingstore.delete(uid)
- self.Deleted(content.id)
- logger.debug("deleted %s" % content.id)
+ self.Deleted(uid)
+ logger.debug("deleted %s" % uid)
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Deleted(self, uid): pass
@@ -411,3 +411,8 @@ class DataStore(dbus.service.Object):
def Stopped(self): pass
+ def complete_indexing(self):
+ """Block waiting for all queued indexing operations to complete"""
+ for mp in self.mountpoints.itervalues():
+ mp.complete_indexing()
+
diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py
index 5c737ad..4db2ad2 100644
--- a/src/olpc/datastore/model.py
+++ b/src/olpc/datastore/model.py
@@ -76,66 +76,6 @@ class Property(object):
value = property(get_value, set_value)
def __str__(self): return str(self.value)
-
-def noop(value): return value
-
-# Xapian doesn't have real binary storage, rather these keys will get
-# indexed it its database. If the key size is too large the indexing
-# will fail
-# there are two solutions -- divert the storage to the backingstore
-# and retain a key reference to recover it (this is the correct
-# solution long term as it participates in versioning) and what I do
-# now which is to insert and remove spaces into the base64 stream
-# every fixed amount of characters
-import re
-base64hack = re.compile("(\S{212})")
-def base64enc(value): return ' '.join(base64hack.split(value.encode('base64')))
-def base64dec(value): return value.replace(' ', '').decode('base64')
-
-dateformat = "%Y-%m-%dT%H:%M:%S"
-def datedec(value, dateformat=dateformat):
- ti = time.strptime(value, dateformat)
- dt = datetime.datetime(*(ti[:-2]))
- dt = dt.replace(microsecond=0)
- return dt
-
-def dateenc(value, dateformat=dateformat):
- if isinstance(value, basestring):
- # XXX: there is an issue with microseconds not getting parsed
- ti = time.strptime(value, dateformat)
- value = datetime.datetime(*(ti[:-2]))
- value = value.replace(microsecond=0)
- # XXX: drop time for now, this is a xapian issue
- value = value.date()
- return value.isoformat()
-
-# syntactic sugar for the below
-def p(key, kind, **kwargs): return (key, kind, kwargs)
-
-# type, get, set, xapian sort type [string|float|date], defaults
-# defaults are the default options to addField in IndexManager
-# these can be overridden on model assignment
-registerPropertyType('string', noop, noop, 'string', {'store' : True,
- 'exact' : True,
- 'sortable' : True})
-
-registerPropertyType('text', noop, noop, 'string', {'store' : True,
- 'exact' : False,
- 'sortable' : False})
-
-registerPropertyType('binary', noop, noop, None, {'store' : True,
- 'exact' : False,
- 'sortable' : False})
-
-registerPropertyType('number', str, float, 'float', {'store' : True,
- 'exact' : True,
- 'sortable' : True})
-
-registerPropertyType('date', dateenc, datedec, 'date', {'store' : True,
- 'exact' : True,
- 'sortable' : True
- })
-
class Model(object):
"""Object containing the field/property model used by the
@@ -145,7 +85,7 @@ class Model(object):
self.fields = {}
self.fieldnames = []
- def addField(self, key, kind, **kwargs):
+ def addField(self, key, kind, overrides=None):
""" Add a field to the model.
key -- field name
kind -- type by name (registered with registerPropertyType)
@@ -158,7 +98,7 @@ class Model(object):
impl = propertyByKind(kind)
options = impl.defaults.copy()
- if kwargs: options.update(kwargs)
+ if overrides: options.update(overrides)
if impl.xapian_sort_type:
if 'type' not in options:
options['type'] = impl.xapian_sort_type
@@ -169,7 +109,7 @@ class Model(object):
def addFields(self, *args):
""" List of arguments to addField """
- for arg in args: self.addField(arg[0], arg[1], **arg[2])
+ for arg in args: self.addField(*arg)
return self
def apply(self, indexmanager):
@@ -178,27 +118,6 @@ class Model(object):
args = self.fields[fn]
addField(args[0], **args[2])
-
-defaultModel = Model().addFields(
- p('text', 'text'),
- # vid is version id
- p('vid', store=True, exact=True, sortable=True, type="float"),
- p('filename', store=True, exact=True),
- # Title has additional weight
- p('title', store=True, exact=False, weight=2, sortable=True),
- p('url', store=True, exact=True, sortable=True),
- p('mimetype', store=True, exact=True),
- p('author', store=True, exact=True),
- p('language', store=True, exact=True),
- p('ctime', store=True, exact=True, sortable=True, type='date'),
- p('mtime', store=True, exact=True, sortable=True, type='date'),
- # this will just be a space delimited list of tags
- # indexed with the content
- # I give them high weight as they have user given semantic value.
- p('tags', store=True, exact=False, weight=3, sortable=True),
- )
-
-
class Content(object):
"""A light weight proxy around Xapian Documents from secore.
This provides additional methods which are used in the
@@ -283,4 +202,87 @@ class Content(object):
## pass
-
+
+def noop(value): return value
+
+# Xapian doesn't have real binary storage, rather these keys will get
+# indexed it its database. If the key size is too large the indexing
+# will fail
+# there are two solutions -- divert the storage to the backingstore
+# and retain a key reference to recover it (this is the correct
+# solution long term as it participates in versioning) and what I do
+# now which is to insert and remove spaces into the base64 stream
+# every fixed amount of characters
+import re
+base64hack = re.compile("(\S{212})")
+def base64enc(value): return ' '.join(base64hack.split(value.encode('base64')))
+def base64dec(value): return value.replace(' ', '').decode('base64')
+
+dateformat = "%Y-%m-%dT%H:%M:%S"
+def datedec(value, dateformat=dateformat):
+ ti = time.strptime(value, dateformat)
+ dt = datetime.datetime(*(ti[:-2]))
+ dt = dt.replace(microsecond=0)
+ return dt
+
+def dateenc(value, dateformat=dateformat):
+ if isinstance(value, basestring):
+ # XXX: there is an issue with microseconds not getting parsed
+ ti = time.strptime(value, dateformat)
+ value = datetime.datetime(*(ti[:-2]))
+ value = value.replace(microsecond=0)
+ # XXX: drop time for now, this is a xapian issue
+ value = value.date()
+ return value.isoformat()
+
+# type, get, set, xapian sort type [string|float|date], defaults
+# defaults are the default options to addField in IndexManager
+# these can be overridden on model assignment
+registerPropertyType('string', noop, noop, 'string', {'store' : True,
+ 'exact' : True,
+ 'sortable' : True})
+
+registerPropertyType('text', noop, noop, 'string', {'store' : True,
+ 'exact' : False,
+ 'sortable' : False})
+
+registerPropertyType('binary', noop, noop, None, {'store' : True,
+ 'exact' : False,
+ 'sortable' : False})
+
+registerPropertyType('int', str, int, 'float', {'store' : True,
+ 'exact' : True,
+ 'sortable' : True})
+
+registerPropertyType('number', str, float, 'float', {'store' : True,
+ 'exact' : True,
+ 'sortable' : True})
+
+registerPropertyType('date', dateenc, datedec, 'date', {'store' : True,
+ 'exact' : True,
+ 'sortable' : True
+ })
+
+
+
+defaultModel = Model().addFields(
+ ('text', 'text'),
+ # vid is version id
+ ('vid', 'number'),
+ ('checksum', 'string'),
+ ('filename', 'string'),
+ # Title has additional weight
+ ('title', 'text', {'weight' : 2 }),
+ ('url', 'string'),
+ ('mimetype', 'string'),
+ ('author', 'string'),
+ ('language', 'string'),
+ ('ctime', 'date'),
+ ('mtime', 'date'),
+ # this will just be a space delimited list of tags
+ # indexed with the content
+ # I give them high weight as they have user given semantic value.
+ ('tags', 'text', {'weight' :3 } ),
+ )
+
+
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index b02f4af..8695cb9 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -28,6 +28,12 @@ from olpc.datastore.utils import create_uid
# Setup Logger
logger = logging.getLogger('org.sugar.datastore.xapianindex')
+# Indexer Operations
+CREATE = 1
+UPDATE = 2
+DELETE = 3
+
+
class ContentMappingIter(object):
"""An iterator over a set of results from a search.
@@ -103,8 +109,7 @@ class IndexManager(object):
# Index thread management
def startIndexer(self):
self.indexer_running = True
- self.indexer = threading.Thread(target=self.indexThread,
- name="XapianIndexer")
+ self.indexer = threading.Thread(target=self.indexThread)
self.indexer.setDaemon(True)
self.indexer.start()
@@ -114,8 +119,8 @@ class IndexManager(object):
self.indexer_running = False
self.indexer.join()
- def enque(self, uid, vid, doc, created):
- self.queue.put((uid, vid, doc, created))
+ def enque(self, uid, vid, doc, operation):
+ self.queue.put((uid, vid, doc, operation))
def indexThread(self):
# process the queue
@@ -123,31 +128,40 @@ class IndexManager(object):
# for example if a USB stick is added and quickly removed
# the mount should however get a stop() call which would
# request that the indexing finish
- logger = logging.getLogger('org.sugar.datastore.xapianindex.indexThread')
while self.indexer_running:
# include timeout here to ease shutdown of the thread
# if this is a non-issue we can simply allow it to block
try:
- uid, vid, doc, created = self.queue.get(timeout=0.5)
+ uid, vid, doc, operation = self.queue.get(timeout=0.5)
- if created: self.write_index.add(doc)
- else: self.write_index.replace(doc)
+ if operation is CREATE: self.write_index.add(doc)
+ elif operation is UPDATE: self.write_index.replace(doc)
+ elif operation is DELETE: self.write_index.delete(uid)
+ else:
+ logger.warning("Unknown indexer operation ( %s: %s)" % \
+ (uid, operation))
+ continue
# XXX: if there is still work in the queue we could
# delay the flush()
- self.flush()
+ #if self.queue.empty(): self.flush()
logger.info("Indexed Content %s:%s" % (uid, vid))
self.queue.task_done()
except Empty:
pass
- except:
- logger.exception("Error in index thread. Attempting recovery")
- try: self.write_index.close()
- except: pass
- self.write_index = secore.IndexerConnection(self.repo)
- self.read_index.reopen()
-
+## except:
+## try: self.write_index.close()
+## except: pass
+## try:
+## self.write_index = secore.IndexerConnection(self.repo)
+## self.read_index.reopen()
+## except:
+## # Shut down the indexer
+## logger.critical("Indexer Failed, Shutting it down")
+## self.indexer_running = False
+
+
@property
@@ -160,6 +174,7 @@ class IndexManager(object):
primarily in testing.
"""
self.queue.join()
+ self.flush()
#
# Field management
@@ -188,7 +203,7 @@ class IndexManager(object):
#
# Index Functions
- def mapProperties(self, props):
+ def _mapProperties(self, props):
"""data normalization function, maps dicts of key:kind->value
to Property objects
"""
@@ -203,11 +218,11 @@ class IndexManager(object):
Props must contain the following:
key -> Property()
"""
- props = self.mapProperties(props)
+ props = self._mapProperties(props)
doc = secore.UnprocessedDocument()
add = doc.fields.append
fp = None
- created = False
+ operation = UPDATE
if filename:
mimetype = props.get("mimetype")
@@ -231,9 +246,9 @@ class IndexManager(object):
if uid: uid = uid.value
else:
uid = create_uid()
- created = True
+ operation = CREATE
- if vid: vid = vid.value
+ if vid: vid = str(float(vid.value) + 1.0)
else: vid = "1.0"
doc.id = uid
@@ -252,7 +267,7 @@ class IndexManager(object):
add(secore.Field(k, value))
# queue the document for processing
- self.enque(uid, vid, doc, created)
+ self.enque(uid, vid, doc, operation)
return uid
@@ -265,8 +280,7 @@ class IndexManager(object):
# does this need queuing?
# the higher level abstractions have to handle interaction
# with versioning policy and so on
- self.write_index.delete(uid)
- self.flush()
+ self.enque(uid, None, None, DELETE)
#
# Search
@@ -317,10 +331,10 @@ class IndexManager(object):
# different python type this has to happen now
descriptor = self.datamodel.fields.get(property)
if descriptor:
- kind = descriptor[1].get('type', 'string')
+ kind = descriptor[1]
impl = model.propertyByKind(kind)
- r = set([impl.get(i) for i in r])
-
+ r = set([impl.set(i) for i in r])
+
return r
def parse_query(self, query):
diff --git a/tests/milestone_1.txt b/tests/milestone_1.txt
index 2472260..48d09bc 100644
--- a/tests/milestone_1.txt
+++ b/tests/milestone_1.txt
@@ -87,6 +87,4 @@ We can also remove the file from the repository.
This is the basis of milestone 1.
>>> ds.stop()
->>> del ds
-
>>> assert os.system('rm -rf /tmp/test_ds') == 0
diff --git a/tests/milestone_2.txt b/tests/milestone_2.txt
index 516d497..35ad9ec 100644
--- a/tests/milestone_2.txt
+++ b/tests/milestone_2.txt
@@ -7,17 +7,22 @@ First clean up from any other tests.
>>> assert os.system('rm -rf /tmp/test_ds/') == 0
>>> from olpc.datastore import DataStore
->>> from olpc.datastore import backingstore
+>>> from olpc.datastore import backingstore, model
>>> ds = DataStore()
>>> ds.registerBackend(backingstore.FileBackingStore)
->>> assert ds.mount("/tmp/test_ds")
+>>> dm = model.defaultModel.addField('year', 'int').addField('month', 'string')
->>> a = ds.create(dict(title="Content A", author="Bob", year=1999, month="Jan"), '')
->>> b = ds.create(dict(title="Content B", author="Alice", year=2000, month="Jan"), '')
+>>> assert ds.mount("/tmp/test_ds", {'indexmanager.model' : dm})
+
+>>> a = ds.create(dict(title="Content A", author="Bob", year="1999", month="Jan"), '')
+>>> b = ds.create(dict(title="Content B", author="Alice", year="2000", month="Jan"), '')
Find should return both
>>> def find2uids(results): return [i['uid'] for i in results[0]]
->>> assert set(find2uids(ds.find({}))) == set([a,b])
+
+>>> ds.complete_indexing()
+
+>>> assert set(find2uids(ds.find())) == set([a,b])
But what if we want the results ordered?
@@ -35,3 +40,4 @@ and if we want to reverse order it?
>>> ds.stop()
>>> del ds
+>>> assert os.system('rm -rf /tmp/test_ds/') == 0
diff --git a/tests/properties.txt b/tests/properties.txt
index dd93b69..6067aac 100644
--- a/tests/properties.txt
+++ b/tests/properties.txt
@@ -19,7 +19,7 @@ Set up two mount points.
Extend the model to retain a 'year' property used below.
->>> dm = model.defaultModel.addField('year', store=True, exact=True, sortable=True, type="float")
+>>> dm = model.defaultModel.addField('year', "int")
Mount a couple of stores.
@@ -35,15 +35,14 @@ Create some content on each.
>>> u4 = ds.create({'title' : "Gamma doc", 'author' : "HAL", 'year:number' : 2001, 'mountpoint' : mp2}, tmpData("""Document 4"""))
Now we should be able to discover things about the system properties.
+>>> ds.complete_indexing()
Here we test that we can extract the unique values for certain properties.
>>> assert set(ds.get_uniquevaluesfor('author')) == set(['Ben', 'HAL'])
-Here we try to gather the values for the property year. We'd expect
-these values to come back as numbers, however in the current
-implementation they are stored as unicode values.
+Here we try to gather the values for the property year.
->>> assert set(ds.get_uniquevaluesfor('year')) == set([u'2000', u'2001'])
+>>> assert set(ds.get_uniquevaluesfor('year')) == set([2000, 2001])
diff --git a/tests/runalltests.py b/tests/runalltests.py
index 28802ec..02034b9 100644
--- a/tests/runalltests.py
+++ b/tests/runalltests.py
@@ -21,7 +21,7 @@ doctests = [
resource_filename(__name__, "sugar_demo_may17.txt"),
resource_filename(__name__, "milestone_2.txt"),
resource_filename(__name__, "mountpoints.txt"),
- resource_filename(__name__, "properties.txt")
+ resource_filename(__name__, "properties.txt"),
]
@@ -29,40 +29,25 @@ doctest_options = doctest.ELLIPSIS
doctest_options |= doctest.REPORT_ONLY_FIRST_FAILURE
-# IF YOU ARE NOT GETTING THE RESULTS YOU EXPECT WHILE TESTING
-# THIS IS THE LIKELY CAUSE
-# :: Use distutils to modify the pythonpath for inplace testing
-# using the build directory
-from distutils.util import get_platform
-plat_specifier = ".%s-%s" % (get_platform(), sys.version[0:3])
-build_platlib = os.path.join("build", 'lib' + plat_specifier)
-test_lib = os.path.join(os.path.abspath(".."), build_platlib)
-sys.path.insert(0, test_lib)
-# END PATH ADJUSTMENT CODE
-
-
-
-def tearDownDS(test):
- # and remove the test repository used in some tests
- os.system('rm -rf /tmp/test_ds')
-
def test_suite():
+ global doctests
suite = unittest.TestSuite()
if len(sys.argv) > 1:
doctests = sys.argv[1:]
for dt in doctests:
suite.addTest(doctest.DocFileSuite(dt,
- optionflags=doctest_options, tearDown=tearDownDS))
+ optionflags=doctest_options))
- tests = os.listdir(os.curdir)
- tests = [n[:-3] for n in tests if n.startswith('test') and
- n.endswith('.py')]
+ if len(sys.argv) <= 1:
+ tests = os.listdir(os.curdir)
+ tests = [n[:-3] for n in tests if n.startswith('test') and
+ n.endswith('.py')]
- for test in tests:
- m = __import__(test)
- if hasattr(m, 'test_suite'):
- suite.addTest(m.test_suite())
+ for test in tests:
+ m = __import__(test)
+ if hasattr(m, 'test_suite'):
+ suite.addTest(m.test_suite())
return suite
diff --git a/tests/test_model.py b/tests/test_model.py
index d7aea45..2f44b41 100644
--- a/tests/test_model.py
+++ b/tests/test_model.py
@@ -30,10 +30,8 @@ class Test(unittest.TestCase):
ds.registerBackend(backingstore.FileBackingStore)
#add a custom field to the model
- dm = model.defaultModel.addField('thumbnail',
- store=True,
- exact=False,
- sortable=False)
+ dm = model.defaultModel.addField('thumbnail', 'binary')
+
ds.mount(DEFAULT_STORE, {'indexmanager.model' : dm})
@@ -41,6 +39,7 @@ class Test(unittest.TestCase):
data = open('test.jpg', 'r').read()
# binary data with \0's in it can cause dbus errors here
fn = tmpData("with image\0\0 prop")
+ # XXX: We should be able to remove:binary now
uid = ds.create({'title' : "Document 1", 'thumbnail:binary' : data}, fn)
waitforindex(ds)
diff --git a/tests/testutils.py b/tests/testutils.py
index 48d1060..a4efc0a 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -1,9 +1,5 @@
import tempfile
import os
-import time
-
-from olpc.datastore.xapianindex import IndexManager
-from olpc.datastore.datastore import DataStore
def tmpData(data):
"""Put data into a temporary file returning the filename """
@@ -12,16 +8,7 @@ def tmpData(data):
os.close(fd)
return fn
-def waitforindex(obj, interval=0.1):
+def waitforindex(obj):
# wait for any/all index managers associated with object to finish
# indexing so that tests can do there thing
- if isinstance(obj, IndexManager):
- obj.complete_indexing()
- elif isinstance(obj, DataStore):
- for mp in obj.mountpoints.values():
- im = mp.indexmanager
- im.complete_indexing()
- else:
- # backingstore
- obj.indexmanager.complete_indexing()
-
+ obj.complete_indexing()