diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-12 23:59:50 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-12 23:59:50 (GMT) |
commit | b58ff0c10ce8246d8d62a09d0aa6622c059e4d28 (patch) | |
tree | 0445b3752398e7b3b23b791203a9e22e987dc34c | |
parent | 7aae48766ae46bd530a3c556cd4e92a0e02f7ad3 (diff) |
property/type sync up
better join code on indexer
-rw-r--r-- | src/olpc/datastore/backingstore.py | 21 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 13 | ||||
-rw-r--r-- | src/olpc/datastore/model.py | 172 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 68 | ||||
-rw-r--r-- | tests/milestone_1.txt | 2 | ||||
-rw-r--r-- | tests/milestone_2.txt | 16 | ||||
-rw-r--r-- | tests/properties.txt | 9 | ||||
-rw-r--r-- | tests/runalltests.py | 37 | ||||
-rw-r--r-- | tests/test_model.py | 7 | ||||
-rw-r--r-- | tests/testutils.py | 17 |
10 files changed, 181 insertions, 181 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 8ed1011..d7ea1bc 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -377,7 +377,9 @@ class FileBackingStore(BackingStore): def stop(self): self.indexmanager.stop() - + + def complete_indexing(self): + self.indexmanager.complete_indexing() class InplaceFileBackingStore(FileBackingStore): """Like the normal FileBackingStore this Backingstore manages the @@ -429,6 +431,7 @@ class InplaceFileBackingStore(FileBackingStore): for fn in filenames: source = os.path.join(dirpath, fn) relative = source[len(self.uri)+1:] + result, count = self.indexmanager.search(dict(filename=relative)) if not count: # create a new record @@ -436,13 +439,14 @@ class InplaceFileBackingStore(FileBackingStore): else: # update the object with the new content iif the # checksum is different - # XXX: what if there is more than one? (shouldn't happen) - content = result[0] - uid = content + # XXX: what if there is more than one? (shouldn't + # happen) + content = result.next() + uid = content.id # only if the checksum is different - checksum = self._checksum(source) - if checksum != content.checksum: - self.update(uid, dict(filename=relative), source) + #checksum = self._checksum(source) + #if checksum != content.checksum: + self.update(uid, dict(filename=relative), source) @@ -460,7 +464,8 @@ class InplaceFileBackingStore(FileBackingStore): def update(self, uid, props, filelike=None): # the file would have already been changed inplace # don't touch it - self.indexmanager.index(uid, props, filelike) + props['uid'] = uid + self.indexmanager.index(props, filelike) def delete(self, uid): c = self.indexmanager.get(uid) diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index da8ab74..2429637 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -155,7 +155,7 @@ class DataStore(dbus.service.Object): def _resolveMountpoint(self, mountpoint=None): if isinstance(mountpoint, dict): - mountpoint = mountpoint.get('mountpoint') + mountpoint = mountpoint.pop('mountpoint', None) if mountpoint is not None: # this should be the id of a mount point @@ -361,7 +361,7 @@ class DataStore(dbus.service.Object): mountpoints = query.pop('mountpoints', self.mountpoints) mountpoints = [self.mountpoints[str(m)] for m in mountpoints] results = set() - + for mp in mountpoints: result = mp.get_uniquevaluesfor(propertyname) results = results.union(result) @@ -394,8 +394,8 @@ class DataStore(dbus.service.Object): content = self.get(uid) if content: content.backingstore.delete(uid) - self.Deleted(content.id) - logger.debug("deleted %s" % content.id) + self.Deleted(uid) + logger.debug("deleted %s" % uid) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Deleted(self, uid): pass @@ -411,3 +411,8 @@ class DataStore(dbus.service.Object): def Stopped(self): pass + def complete_indexing(self): + """Block waiting for all queued indexing operations to complete""" + for mp in self.mountpoints.itervalues(): + mp.complete_indexing() + diff --git a/src/olpc/datastore/model.py b/src/olpc/datastore/model.py index 5c737ad..4db2ad2 100644 --- a/src/olpc/datastore/model.py +++ b/src/olpc/datastore/model.py @@ -76,66 +76,6 @@ class Property(object): value = property(get_value, set_value) def __str__(self): return str(self.value) - -def noop(value): return value - -# Xapian doesn't have real binary storage, rather these keys will get -# indexed it its database. If the key size is too large the indexing -# will fail -# there are two solutions -- divert the storage to the backingstore -# and retain a key reference to recover it (this is the correct -# solution long term as it participates in versioning) and what I do -# now which is to insert and remove spaces into the base64 stream -# every fixed amount of characters -import re -base64hack = re.compile("(\S{212})") -def base64enc(value): return ' '.join(base64hack.split(value.encode('base64'))) -def base64dec(value): return value.replace(' ', '').decode('base64') - -dateformat = "%Y-%m-%dT%H:%M:%S" -def datedec(value, dateformat=dateformat): - ti = time.strptime(value, dateformat) - dt = datetime.datetime(*(ti[:-2])) - dt = dt.replace(microsecond=0) - return dt - -def dateenc(value, dateformat=dateformat): - if isinstance(value, basestring): - # XXX: there is an issue with microseconds not getting parsed - ti = time.strptime(value, dateformat) - value = datetime.datetime(*(ti[:-2])) - value = value.replace(microsecond=0) - # XXX: drop time for now, this is a xapian issue - value = value.date() - return value.isoformat() - -# syntactic sugar for the below -def p(key, kind, **kwargs): return (key, kind, kwargs) - -# type, get, set, xapian sort type [string|float|date], defaults -# defaults are the default options to addField in IndexManager -# these can be overridden on model assignment -registerPropertyType('string', noop, noop, 'string', {'store' : True, - 'exact' : True, - 'sortable' : True}) - -registerPropertyType('text', noop, noop, 'string', {'store' : True, - 'exact' : False, - 'sortable' : False}) - -registerPropertyType('binary', noop, noop, None, {'store' : True, - 'exact' : False, - 'sortable' : False}) - -registerPropertyType('number', str, float, 'float', {'store' : True, - 'exact' : True, - 'sortable' : True}) - -registerPropertyType('date', dateenc, datedec, 'date', {'store' : True, - 'exact' : True, - 'sortable' : True - }) - class Model(object): """Object containing the field/property model used by the @@ -145,7 +85,7 @@ class Model(object): self.fields = {} self.fieldnames = [] - def addField(self, key, kind, **kwargs): + def addField(self, key, kind, overrides=None): """ Add a field to the model. key -- field name kind -- type by name (registered with registerPropertyType) @@ -158,7 +98,7 @@ class Model(object): impl = propertyByKind(kind) options = impl.defaults.copy() - if kwargs: options.update(kwargs) + if overrides: options.update(overrides) if impl.xapian_sort_type: if 'type' not in options: options['type'] = impl.xapian_sort_type @@ -169,7 +109,7 @@ class Model(object): def addFields(self, *args): """ List of arguments to addField """ - for arg in args: self.addField(arg[0], arg[1], **arg[2]) + for arg in args: self.addField(*arg) return self def apply(self, indexmanager): @@ -178,27 +118,6 @@ class Model(object): args = self.fields[fn] addField(args[0], **args[2]) - -defaultModel = Model().addFields( - p('text', 'text'), - # vid is version id - p('vid', store=True, exact=True, sortable=True, type="float"), - p('filename', store=True, exact=True), - # Title has additional weight - p('title', store=True, exact=False, weight=2, sortable=True), - p('url', store=True, exact=True, sortable=True), - p('mimetype', store=True, exact=True), - p('author', store=True, exact=True), - p('language', store=True, exact=True), - p('ctime', store=True, exact=True, sortable=True, type='date'), - p('mtime', store=True, exact=True, sortable=True, type='date'), - # this will just be a space delimited list of tags - # indexed with the content - # I give them high weight as they have user given semantic value. - p('tags', store=True, exact=False, weight=3, sortable=True), - ) - - class Content(object): """A light weight proxy around Xapian Documents from secore. This provides additional methods which are used in the @@ -283,4 +202,87 @@ class Content(object): ## pass - + +def noop(value): return value + +# Xapian doesn't have real binary storage, rather these keys will get +# indexed it its database. If the key size is too large the indexing +# will fail +# there are two solutions -- divert the storage to the backingstore +# and retain a key reference to recover it (this is the correct +# solution long term as it participates in versioning) and what I do +# now which is to insert and remove spaces into the base64 stream +# every fixed amount of characters +import re +base64hack = re.compile("(\S{212})") +def base64enc(value): return ' '.join(base64hack.split(value.encode('base64'))) +def base64dec(value): return value.replace(' ', '').decode('base64') + +dateformat = "%Y-%m-%dT%H:%M:%S" +def datedec(value, dateformat=dateformat): + ti = time.strptime(value, dateformat) + dt = datetime.datetime(*(ti[:-2])) + dt = dt.replace(microsecond=0) + return dt + +def dateenc(value, dateformat=dateformat): + if isinstance(value, basestring): + # XXX: there is an issue with microseconds not getting parsed + ti = time.strptime(value, dateformat) + value = datetime.datetime(*(ti[:-2])) + value = value.replace(microsecond=0) + # XXX: drop time for now, this is a xapian issue + value = value.date() + return value.isoformat() + +# type, get, set, xapian sort type [string|float|date], defaults +# defaults are the default options to addField in IndexManager +# these can be overridden on model assignment +registerPropertyType('string', noop, noop, 'string', {'store' : True, + 'exact' : True, + 'sortable' : True}) + +registerPropertyType('text', noop, noop, 'string', {'store' : True, + 'exact' : False, + 'sortable' : False}) + +registerPropertyType('binary', noop, noop, None, {'store' : True, + 'exact' : False, + 'sortable' : False}) + +registerPropertyType('int', str, int, 'float', {'store' : True, + 'exact' : True, + 'sortable' : True}) + +registerPropertyType('number', str, float, 'float', {'store' : True, + 'exact' : True, + 'sortable' : True}) + +registerPropertyType('date', dateenc, datedec, 'date', {'store' : True, + 'exact' : True, + 'sortable' : True + }) + + + +defaultModel = Model().addFields( + ('text', 'text'), + # vid is version id + ('vid', 'number'), + ('checksum', 'string'), + ('filename', 'string'), + # Title has additional weight + ('title', 'text', {'weight' : 2 }), + ('url', 'string'), + ('mimetype', 'string'), + ('author', 'string'), + ('language', 'string'), + ('ctime', 'date'), + ('mtime', 'date'), + # this will just be a space delimited list of tags + # indexed with the content + # I give them high weight as they have user given semantic value. + ('tags', 'text', {'weight' :3 } ), + ) + + diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index b02f4af..8695cb9 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -28,6 +28,12 @@ from olpc.datastore.utils import create_uid # Setup Logger logger = logging.getLogger('org.sugar.datastore.xapianindex') +# Indexer Operations +CREATE = 1 +UPDATE = 2 +DELETE = 3 + + class ContentMappingIter(object): """An iterator over a set of results from a search. @@ -103,8 +109,7 @@ class IndexManager(object): # Index thread management def startIndexer(self): self.indexer_running = True - self.indexer = threading.Thread(target=self.indexThread, - name="XapianIndexer") + self.indexer = threading.Thread(target=self.indexThread) self.indexer.setDaemon(True) self.indexer.start() @@ -114,8 +119,8 @@ class IndexManager(object): self.indexer_running = False self.indexer.join() - def enque(self, uid, vid, doc, created): - self.queue.put((uid, vid, doc, created)) + def enque(self, uid, vid, doc, operation): + self.queue.put((uid, vid, doc, operation)) def indexThread(self): # process the queue @@ -123,31 +128,40 @@ class IndexManager(object): # for example if a USB stick is added and quickly removed # the mount should however get a stop() call which would # request that the indexing finish - logger = logging.getLogger('org.sugar.datastore.xapianindex.indexThread') while self.indexer_running: # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: - uid, vid, doc, created = self.queue.get(timeout=0.5) + uid, vid, doc, operation = self.queue.get(timeout=0.5) - if created: self.write_index.add(doc) - else: self.write_index.replace(doc) + if operation is CREATE: self.write_index.add(doc) + elif operation is UPDATE: self.write_index.replace(doc) + elif operation is DELETE: self.write_index.delete(uid) + else: + logger.warning("Unknown indexer operation ( %s: %s)" % \ + (uid, operation)) + continue # XXX: if there is still work in the queue we could # delay the flush() - self.flush() + #if self.queue.empty(): self.flush() logger.info("Indexed Content %s:%s" % (uid, vid)) self.queue.task_done() except Empty: pass - except: - logger.exception("Error in index thread. Attempting recovery") - try: self.write_index.close() - except: pass - self.write_index = secore.IndexerConnection(self.repo) - self.read_index.reopen() - +## except: +## try: self.write_index.close() +## except: pass +## try: +## self.write_index = secore.IndexerConnection(self.repo) +## self.read_index.reopen() +## except: +## # Shut down the indexer +## logger.critical("Indexer Failed, Shutting it down") +## self.indexer_running = False + + @property @@ -160,6 +174,7 @@ class IndexManager(object): primarily in testing. """ self.queue.join() + self.flush() # # Field management @@ -188,7 +203,7 @@ class IndexManager(object): # # Index Functions - def mapProperties(self, props): + def _mapProperties(self, props): """data normalization function, maps dicts of key:kind->value to Property objects """ @@ -203,11 +218,11 @@ class IndexManager(object): Props must contain the following: key -> Property() """ - props = self.mapProperties(props) + props = self._mapProperties(props) doc = secore.UnprocessedDocument() add = doc.fields.append fp = None - created = False + operation = UPDATE if filename: mimetype = props.get("mimetype") @@ -231,9 +246,9 @@ class IndexManager(object): if uid: uid = uid.value else: uid = create_uid() - created = True + operation = CREATE - if vid: vid = vid.value + if vid: vid = str(float(vid.value) + 1.0) else: vid = "1.0" doc.id = uid @@ -252,7 +267,7 @@ class IndexManager(object): add(secore.Field(k, value)) # queue the document for processing - self.enque(uid, vid, doc, created) + self.enque(uid, vid, doc, operation) return uid @@ -265,8 +280,7 @@ class IndexManager(object): # does this need queuing? # the higher level abstractions have to handle interaction # with versioning policy and so on - self.write_index.delete(uid) - self.flush() + self.enque(uid, None, None, DELETE) # # Search @@ -317,10 +331,10 @@ class IndexManager(object): # different python type this has to happen now descriptor = self.datamodel.fields.get(property) if descriptor: - kind = descriptor[1].get('type', 'string') + kind = descriptor[1] impl = model.propertyByKind(kind) - r = set([impl.get(i) for i in r]) - + r = set([impl.set(i) for i in r]) + return r def parse_query(self, query): diff --git a/tests/milestone_1.txt b/tests/milestone_1.txt index 2472260..48d09bc 100644 --- a/tests/milestone_1.txt +++ b/tests/milestone_1.txt @@ -87,6 +87,4 @@ We can also remove the file from the repository. This is the basis of milestone 1. >>> ds.stop() ->>> del ds - >>> assert os.system('rm -rf /tmp/test_ds') == 0 diff --git a/tests/milestone_2.txt b/tests/milestone_2.txt index 516d497..35ad9ec 100644 --- a/tests/milestone_2.txt +++ b/tests/milestone_2.txt @@ -7,17 +7,22 @@ First clean up from any other tests. >>> assert os.system('rm -rf /tmp/test_ds/') == 0 >>> from olpc.datastore import DataStore ->>> from olpc.datastore import backingstore +>>> from olpc.datastore import backingstore, model >>> ds = DataStore() >>> ds.registerBackend(backingstore.FileBackingStore) ->>> assert ds.mount("/tmp/test_ds") +>>> dm = model.defaultModel.addField('year', 'int').addField('month', 'string') ->>> a = ds.create(dict(title="Content A", author="Bob", year=1999, month="Jan"), '') ->>> b = ds.create(dict(title="Content B", author="Alice", year=2000, month="Jan"), '') +>>> assert ds.mount("/tmp/test_ds", {'indexmanager.model' : dm}) + +>>> a = ds.create(dict(title="Content A", author="Bob", year="1999", month="Jan"), '') +>>> b = ds.create(dict(title="Content B", author="Alice", year="2000", month="Jan"), '') Find should return both >>> def find2uids(results): return [i['uid'] for i in results[0]] ->>> assert set(find2uids(ds.find({}))) == set([a,b]) + +>>> ds.complete_indexing() + +>>> assert set(find2uids(ds.find())) == set([a,b]) But what if we want the results ordered? @@ -35,3 +40,4 @@ and if we want to reverse order it? >>> ds.stop() >>> del ds +>>> assert os.system('rm -rf /tmp/test_ds/') == 0 diff --git a/tests/properties.txt b/tests/properties.txt index dd93b69..6067aac 100644 --- a/tests/properties.txt +++ b/tests/properties.txt @@ -19,7 +19,7 @@ Set up two mount points. Extend the model to retain a 'year' property used below. ->>> dm = model.defaultModel.addField('year', store=True, exact=True, sortable=True, type="float") +>>> dm = model.defaultModel.addField('year', "int") Mount a couple of stores. @@ -35,15 +35,14 @@ Create some content on each. >>> u4 = ds.create({'title' : "Gamma doc", 'author' : "HAL", 'year:number' : 2001, 'mountpoint' : mp2}, tmpData("""Document 4""")) Now we should be able to discover things about the system properties. +>>> ds.complete_indexing() Here we test that we can extract the unique values for certain properties. >>> assert set(ds.get_uniquevaluesfor('author')) == set(['Ben', 'HAL']) -Here we try to gather the values for the property year. We'd expect -these values to come back as numbers, however in the current -implementation they are stored as unicode values. +Here we try to gather the values for the property year. ->>> assert set(ds.get_uniquevaluesfor('year')) == set([u'2000', u'2001']) +>>> assert set(ds.get_uniquevaluesfor('year')) == set([2000, 2001]) diff --git a/tests/runalltests.py b/tests/runalltests.py index 28802ec..02034b9 100644 --- a/tests/runalltests.py +++ b/tests/runalltests.py @@ -21,7 +21,7 @@ doctests = [ resource_filename(__name__, "sugar_demo_may17.txt"), resource_filename(__name__, "milestone_2.txt"), resource_filename(__name__, "mountpoints.txt"), - resource_filename(__name__, "properties.txt") + resource_filename(__name__, "properties.txt"), ] @@ -29,40 +29,25 @@ doctest_options = doctest.ELLIPSIS doctest_options |= doctest.REPORT_ONLY_FIRST_FAILURE -# IF YOU ARE NOT GETTING THE RESULTS YOU EXPECT WHILE TESTING -# THIS IS THE LIKELY CAUSE -# :: Use distutils to modify the pythonpath for inplace testing -# using the build directory -from distutils.util import get_platform -plat_specifier = ".%s-%s" % (get_platform(), sys.version[0:3]) -build_platlib = os.path.join("build", 'lib' + plat_specifier) -test_lib = os.path.join(os.path.abspath(".."), build_platlib) -sys.path.insert(0, test_lib) -# END PATH ADJUSTMENT CODE - - - -def tearDownDS(test): - # and remove the test repository used in some tests - os.system('rm -rf /tmp/test_ds') - def test_suite(): + global doctests suite = unittest.TestSuite() if len(sys.argv) > 1: doctests = sys.argv[1:] for dt in doctests: suite.addTest(doctest.DocFileSuite(dt, - optionflags=doctest_options, tearDown=tearDownDS)) + optionflags=doctest_options)) - tests = os.listdir(os.curdir) - tests = [n[:-3] for n in tests if n.startswith('test') and - n.endswith('.py')] + if len(sys.argv) <= 1: + tests = os.listdir(os.curdir) + tests = [n[:-3] for n in tests if n.startswith('test') and + n.endswith('.py')] - for test in tests: - m = __import__(test) - if hasattr(m, 'test_suite'): - suite.addTest(m.test_suite()) + for test in tests: + m = __import__(test) + if hasattr(m, 'test_suite'): + suite.addTest(m.test_suite()) return suite diff --git a/tests/test_model.py b/tests/test_model.py index d7aea45..2f44b41 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -30,10 +30,8 @@ class Test(unittest.TestCase): ds.registerBackend(backingstore.FileBackingStore) #add a custom field to the model - dm = model.defaultModel.addField('thumbnail', - store=True, - exact=False, - sortable=False) + dm = model.defaultModel.addField('thumbnail', 'binary') + ds.mount(DEFAULT_STORE, {'indexmanager.model' : dm}) @@ -41,6 +39,7 @@ class Test(unittest.TestCase): data = open('test.jpg', 'r').read() # binary data with \0's in it can cause dbus errors here fn = tmpData("with image\0\0 prop") + # XXX: We should be able to remove:binary now uid = ds.create({'title' : "Document 1", 'thumbnail:binary' : data}, fn) waitforindex(ds) diff --git a/tests/testutils.py b/tests/testutils.py index 48d1060..a4efc0a 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -1,9 +1,5 @@ import tempfile import os -import time - -from olpc.datastore.xapianindex import IndexManager -from olpc.datastore.datastore import DataStore def tmpData(data): """Put data into a temporary file returning the filename """ @@ -12,16 +8,7 @@ def tmpData(data): os.close(fd) return fn -def waitforindex(obj, interval=0.1): +def waitforindex(obj): # wait for any/all index managers associated with object to finish # indexing so that tests can do there thing - if isinstance(obj, IndexManager): - obj.complete_indexing() - elif isinstance(obj, DataStore): - for mp in obj.mountpoints.values(): - im = mp.indexmanager - im.complete_indexing() - else: - # backingstore - obj.indexmanager.complete_indexing() - + obj.complete_indexing() |