From f1f0bbdfb486639812b0e04389feb67b83540b73 Mon Sep 17 00:00:00 2001 From: Benjamin Saller Date: Sun, 22 Jul 2007 00:48:30 +0000 Subject: improved importer used in USB cases includes additional metadata -- ctime/mtime/title walk of content is now async -- results become available as indexed introduced a workaround to dbus.mainloop.run() blocking the other threads this is only a work around and not the intended way to fix this issue but it does allow the other threads to run which means you'll see indexing in the background. --- diff --git a/bin/datastore-service b/bin/datastore-service index 7dd87ce..2019793 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -49,6 +49,7 @@ ds = DataStore() ds.registerBackend(backingstore.FileBackingStore) ds.registerBackend(backingstore.InplaceFileBackingStore) ds.mount(repo_dir) +ds.complete_indexing() # and run it logger.info("Starting Datastore %s" % (repo_dir)) @@ -63,7 +64,18 @@ signal.signal(signal.SIGHUP, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) def main(): - try: mainloop.run() + # XXX: The context/sleep loop is a work around for what seems + # to be the mainloop blocking in such a way that the indexer + # thread doesn't run until new dbus messages come in... + # I intend to track this down post trial-2 + # really awful + import time + context = mainloop.get_context() + try: + while True: + context.iteration(False) + time.sleep(0.025) + except KeyboardInterrupt: ds.stop() logger.info("DataStore shutdown by user") @@ -72,9 +84,9 @@ def main(): logger.debug("Datastore shutdown with error", exc_info=sys.exc_info()) -#main() +main() -import hotshot -p = hotshot.Profile('hs.prof') -p.run('main()') +#import hotshot +#p = hotshot.Profile('hs.prof') +#p.run('main()') diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index b5b93f9..f317983 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -11,12 +11,14 @@ __copyright__ = 'Copyright ObjectRealms, LLC, 2007' __license__ = 'The GNU Public License V2+' import cPickle as pickle +from datetime import datetime import gnomevfs import os import re import sha import subprocess import time +import threading from olpc.datastore.xapianindex import IndexManager from olpc.datastore import bin_copy @@ -407,7 +409,8 @@ class InplaceFileBackingStore(FileBackingStore): super(InplaceFileBackingStore, self).__init__(uri, **kwargs) # use the original uri self.uri = uri - + self.walker = None + @staticmethod def parse(uri): return uri.startswith("inplace:") @@ -421,7 +424,10 @@ class InplaceFileBackingStore(FileBackingStore): def load(self): super(InplaceFileBackingStore, self).load() # now map/update the existing data into the indexes - self._walk() + # but do it async + self.walker = threading.Thread(target=self._walk) + self.walker.setDaemon(True) + self.walker.start() def _walk(self): # XXX: a version that checked xattr for uid would be simple @@ -433,16 +439,33 @@ class InplaceFileBackingStore(FileBackingStore): if self.base in dirpath: continue if self.STORE_NAME in dirname: dirname.remove(self.STORE_NAME) + + # other files and dirs to blacklist + if '.Trashes' in dirpath: continue + for fn in filenames: + # blacklist files + # ignore conventionally hidden files + if fn.startswith("."): continue + source = os.path.join(dirpath, fn) relative = source[len(self.uri)+1:] result, count = self.indexmanager.search(dict(filename=relative)) mime_type = gnomevfs.get_mime_type(source) + stat = os.stat(source) + ctime = datetime.fromtimestamp(stat.st_ctime).isoformat() + mtime = datetime.fromtimestamp(stat.st_mtime).isoformat() + title = os.path.splitext(os.path.split(source)[1])[0] + metadata = dict(filename=relative, + mime_type=mime_type, + ctime=ctime, + mtime=mtime, + title=title) if not count: # create a new record - self.create(dict(filename=relative, mime_type=mime_type), source) + self.create(metadata, source) else: # update the object with the new content iif the # checksum is different @@ -450,14 +473,12 @@ class InplaceFileBackingStore(FileBackingStore): # happen) content = result.next() uid = content.id - # only if the checksum is different - #checksum = self._checksum(source) - #if checksum != content.checksum: - self.update(uid, dict(filename=relative, mime_type=mime_type), source) - - if self.options.get('sync_mount', False): - self.complete_indexing() - + saved_mtime = content.get_property('mtime') + if mtime != saved_mtime: + self.update(uid, metadata, source) + self.indexmanager.flush() + return + # File Management API def create(self, props, filelike): # the file would have already been changed inplace @@ -482,4 +503,13 @@ class InplaceFileBackingStore(FileBackingStore): if path and os.path.exists(path): os.unlink(path) + def stop(self): + if self.walker and self.walker.isAlive(): + self.walker.join() + self.indexmanager.stop() + + def complete_indexing(self): + if self.walker and self.walker.isAlive(): + self.walker.join() + self.indexmanager.complete_indexing() diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py index 8821061..e5baf47 100644 --- a/src/olpc/datastore/converter.py +++ b/src/olpc/datastore/converter.py @@ -75,15 +75,22 @@ class subprocessconverter(object): try: cmd = cmd.split() - retcode = subprocess.call(cmd) + # the stderr capture here will hide glib error messages + # from converters which shouldn't be generating output anyway + retcode = subprocess.call(cmd, stderr=subprocess.PIPE) if retcode: return None return codecs.open(target, 'r', 'utf-8') + except UnicodeDecodeError: + # The data was an unknown type but couldn't be understood + # as text so we don't attempt to index it. This most + # likely means its just an unknown binary format. + return None finally: # we unlink the file as its already been opened for # reading if os.path.exists(target): os.unlink(target) - + class noop(object): def verify(self): return True def __call__(self, filename): @@ -108,26 +115,23 @@ class Converter(object): #encoding is passed its the known encoding of the #contents. When None is passed the encoding is guessed which #can result in unexpected or no output. - ext = os.path.splitext(filename)[1] if mimetype: mt = mimetype else: mt = guess_mimetype(filename) maintype, subtype = mt.split('/',1) converter = self._converters.get(mt) if not converter: - converter = self._converters.get(ext) - if not converter: - converter = self._default - # it was an image or an unknown application - if maintype in ['image', 'application', 'audio', 'video'] or \ - subtype in ['x-trash', 'x-python-bytecode',]: - converter = None + converter = self._default + # it was an image or an unknown application + if maintype in ['image', 'application', 'audio', 'video'] or \ + subtype in ['x-trash', 'x-python-bytecode',]: + converter = None + if converter: - try: - return converter(filename) + try: return converter(filename) except: - logging.debug("Binary to Text failed: %s %s %s" % - (ext, mt, filename), exc_info=sys.exc_info()) + logging.debug("Binary to Text failed: %s %s" % + (mt, filename), exc_info=sys.exc_info()) return None @@ -156,7 +160,7 @@ converter.registerConverter('.doc', doctotext) converter.registerConverter('application/msword', doctotext) # ODT -odt2txt = subprocessconverter('/usr/local/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s') +odt2txt = subprocessconverter('/usr/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s') converter.registerConverter('.odt', odt2txt) converter.registerConverter('application/vnd.oasis.opendocument.text', odt2txt) diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index a6f8994..3710d68 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -17,7 +17,8 @@ from Queue import Queue, Empty import logging import re import sys - +import time +import thread import threading import warnings @@ -91,10 +92,11 @@ class IndexManager(object): self.read_index = secore.SearchConnection(repo) self.flush() - # by default we start the indexer now self.startIndexer() + assert self.indexer.isAlive() + def bind_to(self, backingstore): # signal from backingstore that its our parent self.backingstore = backingstore @@ -147,7 +149,7 @@ class IndexManager(object): if not filestuff: # In this case we are done return - + self.queue.put((uid, vid, doc, operation, filestuff)) def indexThread(self): @@ -160,8 +162,10 @@ class IndexManager(object): # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: - uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5) + data = self.queue.get(True, 0.025) + uid, vid, doc, operation, filestuff = data except Empty: + #time.sleep(1.0) continue try: -- cgit v0.9.1