diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-22 00:48:30 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-07-22 00:48:30 (GMT) |
commit | f1f0bbdfb486639812b0e04389feb67b83540b73 (patch) | |
tree | 67c2c4ae3cdf226b0df2a49f55633c4b8d7eebdc /src | |
parent | fd4056b0893b44f6ce13ab0fc94fede440f549a1 (diff) |
improved importer used in USB cases
includes additional metadata -- ctime/mtime/title
walk of content is now async -- results become available as indexed
introduced a workaround to dbus.mainloop.run() blocking the other threads
this is only a work around and not the intended way to fix this issue
but it does allow the other threads to run which means you'll see indexing
in the background.
Diffstat (limited to 'src')
-rw-r--r-- | src/olpc/datastore/backingstore.py | 52 | ||||
-rw-r--r-- | src/olpc/datastore/converter.py | 34 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 12 |
3 files changed, 68 insertions, 30 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index b5b93f9..f317983 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -11,12 +11,14 @@ __copyright__ = 'Copyright ObjectRealms, LLC, 2007' __license__ = 'The GNU Public License V2+' import cPickle as pickle +from datetime import datetime import gnomevfs import os import re import sha import subprocess import time +import threading from olpc.datastore.xapianindex import IndexManager from olpc.datastore import bin_copy @@ -407,7 +409,8 @@ class InplaceFileBackingStore(FileBackingStore): super(InplaceFileBackingStore, self).__init__(uri, **kwargs) # use the original uri self.uri = uri - + self.walker = None + @staticmethod def parse(uri): return uri.startswith("inplace:") @@ -421,7 +424,10 @@ class InplaceFileBackingStore(FileBackingStore): def load(self): super(InplaceFileBackingStore, self).load() # now map/update the existing data into the indexes - self._walk() + # but do it async + self.walker = threading.Thread(target=self._walk) + self.walker.setDaemon(True) + self.walker.start() def _walk(self): # XXX: a version that checked xattr for uid would be simple @@ -433,16 +439,33 @@ class InplaceFileBackingStore(FileBackingStore): if self.base in dirpath: continue if self.STORE_NAME in dirname: dirname.remove(self.STORE_NAME) + + # other files and dirs to blacklist + if '.Trashes' in dirpath: continue + for fn in filenames: + # blacklist files + # ignore conventionally hidden files + if fn.startswith("."): continue + source = os.path.join(dirpath, fn) relative = source[len(self.uri)+1:] result, count = self.indexmanager.search(dict(filename=relative)) mime_type = gnomevfs.get_mime_type(source) + stat = os.stat(source) + ctime = datetime.fromtimestamp(stat.st_ctime).isoformat() + mtime = datetime.fromtimestamp(stat.st_mtime).isoformat() + title = os.path.splitext(os.path.split(source)[1])[0] + metadata = dict(filename=relative, + mime_type=mime_type, + ctime=ctime, + mtime=mtime, + title=title) if not count: # create a new record - self.create(dict(filename=relative, mime_type=mime_type), source) + self.create(metadata, source) else: # update the object with the new content iif the # checksum is different @@ -450,14 +473,12 @@ class InplaceFileBackingStore(FileBackingStore): # happen) content = result.next() uid = content.id - # only if the checksum is different - #checksum = self._checksum(source) - #if checksum != content.checksum: - self.update(uid, dict(filename=relative, mime_type=mime_type), source) - - if self.options.get('sync_mount', False): - self.complete_indexing() - + saved_mtime = content.get_property('mtime') + if mtime != saved_mtime: + self.update(uid, metadata, source) + self.indexmanager.flush() + return + # File Management API def create(self, props, filelike): # the file would have already been changed inplace @@ -482,4 +503,13 @@ class InplaceFileBackingStore(FileBackingStore): if path and os.path.exists(path): os.unlink(path) + def stop(self): + if self.walker and self.walker.isAlive(): + self.walker.join() + self.indexmanager.stop() + + def complete_indexing(self): + if self.walker and self.walker.isAlive(): + self.walker.join() + self.indexmanager.complete_indexing() diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py index 8821061..e5baf47 100644 --- a/src/olpc/datastore/converter.py +++ b/src/olpc/datastore/converter.py @@ -75,15 +75,22 @@ class subprocessconverter(object): try: cmd = cmd.split() - retcode = subprocess.call(cmd) + # the stderr capture here will hide glib error messages + # from converters which shouldn't be generating output anyway + retcode = subprocess.call(cmd, stderr=subprocess.PIPE) if retcode: return None return codecs.open(target, 'r', 'utf-8') + except UnicodeDecodeError: + # The data was an unknown type but couldn't be understood + # as text so we don't attempt to index it. This most + # likely means its just an unknown binary format. + return None finally: # we unlink the file as its already been opened for # reading if os.path.exists(target): os.unlink(target) - + class noop(object): def verify(self): return True def __call__(self, filename): @@ -108,26 +115,23 @@ class Converter(object): #encoding is passed its the known encoding of the #contents. When None is passed the encoding is guessed which #can result in unexpected or no output. - ext = os.path.splitext(filename)[1] if mimetype: mt = mimetype else: mt = guess_mimetype(filename) maintype, subtype = mt.split('/',1) converter = self._converters.get(mt) if not converter: - converter = self._converters.get(ext) - if not converter: - converter = self._default - # it was an image or an unknown application - if maintype in ['image', 'application', 'audio', 'video'] or \ - subtype in ['x-trash', 'x-python-bytecode',]: - converter = None + converter = self._default + # it was an image or an unknown application + if maintype in ['image', 'application', 'audio', 'video'] or \ + subtype in ['x-trash', 'x-python-bytecode',]: + converter = None + if converter: - try: - return converter(filename) + try: return converter(filename) except: - logging.debug("Binary to Text failed: %s %s %s" % - (ext, mt, filename), exc_info=sys.exc_info()) + logging.debug("Binary to Text failed: %s %s" % + (mt, filename), exc_info=sys.exc_info()) return None @@ -156,7 +160,7 @@ converter.registerConverter('.doc', doctotext) converter.registerConverter('application/msword', doctotext) # ODT -odt2txt = subprocessconverter('/usr/local/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s') +odt2txt = subprocessconverter('/usr/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s') converter.registerConverter('.odt', odt2txt) converter.registerConverter('application/vnd.oasis.opendocument.text', odt2txt) diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index a6f8994..3710d68 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -17,7 +17,8 @@ from Queue import Queue, Empty import logging import re import sys - +import time +import thread import threading import warnings @@ -91,10 +92,11 @@ class IndexManager(object): self.read_index = secore.SearchConnection(repo) self.flush() - # by default we start the indexer now self.startIndexer() + assert self.indexer.isAlive() + def bind_to(self, backingstore): # signal from backingstore that its our parent self.backingstore = backingstore @@ -147,7 +149,7 @@ class IndexManager(object): if not filestuff: # In this case we are done return - + self.queue.put((uid, vid, doc, operation, filestuff)) def indexThread(self): @@ -160,8 +162,10 @@ class IndexManager(object): # include timeout here to ease shutdown of the thread # if this is a non-issue we can simply allow it to block try: - uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5) + data = self.queue.get(True, 0.025) + uid, vid, doc, operation, filestuff = data except Empty: + #time.sleep(1.0) continue try: |