Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Saller <bcsaller@objectrealms.net>2007-07-22 00:48:30 (GMT)
committer Benjamin Saller <bcsaller@objectrealms.net>2007-07-22 00:48:30 (GMT)
commitf1f0bbdfb486639812b0e04389feb67b83540b73 (patch)
tree67c2c4ae3cdf226b0df2a49f55633c4b8d7eebdc
parentfd4056b0893b44f6ce13ab0fc94fede440f549a1 (diff)
improved importer used in USB cases
includes additional metadata -- ctime/mtime/title walk of content is now async -- results become available as indexed introduced a workaround to dbus.mainloop.run() blocking the other threads this is only a work around and not the intended way to fix this issue but it does allow the other threads to run which means you'll see indexing in the background.
-rwxr-xr-xbin/datastore-service22
-rw-r--r--src/olpc/datastore/backingstore.py52
-rw-r--r--src/olpc/datastore/converter.py34
-rw-r--r--src/olpc/datastore/xapianindex.py12
4 files changed, 85 insertions, 35 deletions
diff --git a/bin/datastore-service b/bin/datastore-service
index 7dd87ce..2019793 100755
--- a/bin/datastore-service
+++ b/bin/datastore-service
@@ -49,6 +49,7 @@ ds = DataStore()
ds.registerBackend(backingstore.FileBackingStore)
ds.registerBackend(backingstore.InplaceFileBackingStore)
ds.mount(repo_dir)
+ds.complete_indexing()
# and run it
logger.info("Starting Datastore %s" % (repo_dir))
@@ -63,7 +64,18 @@ signal.signal(signal.SIGHUP, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
def main():
- try: mainloop.run()
+ # XXX: The context/sleep loop is a work around for what seems
+ # to be the mainloop blocking in such a way that the indexer
+ # thread doesn't run until new dbus messages come in...
+ # I intend to track this down post trial-2
+ # really awful
+ import time
+ context = mainloop.get_context()
+ try:
+ while True:
+ context.iteration(False)
+ time.sleep(0.025)
+
except KeyboardInterrupt:
ds.stop()
logger.info("DataStore shutdown by user")
@@ -72,9 +84,9 @@ def main():
logger.debug("Datastore shutdown with error",
exc_info=sys.exc_info())
-#main()
+main()
-import hotshot
-p = hotshot.Profile('hs.prof')
-p.run('main()')
+#import hotshot
+#p = hotshot.Profile('hs.prof')
+#p.run('main()')
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index b5b93f9..f317983 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -11,12 +11,14 @@ __copyright__ = 'Copyright ObjectRealms, LLC, 2007'
__license__ = 'The GNU Public License V2+'
import cPickle as pickle
+from datetime import datetime
import gnomevfs
import os
import re
import sha
import subprocess
import time
+import threading
from olpc.datastore.xapianindex import IndexManager
from olpc.datastore import bin_copy
@@ -407,7 +409,8 @@ class InplaceFileBackingStore(FileBackingStore):
super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
# use the original uri
self.uri = uri
-
+ self.walker = None
+
@staticmethod
def parse(uri):
return uri.startswith("inplace:")
@@ -421,7 +424,10 @@ class InplaceFileBackingStore(FileBackingStore):
def load(self):
super(InplaceFileBackingStore, self).load()
# now map/update the existing data into the indexes
- self._walk()
+ # but do it async
+ self.walker = threading.Thread(target=self._walk)
+ self.walker.setDaemon(True)
+ self.walker.start()
def _walk(self):
# XXX: a version that checked xattr for uid would be simple
@@ -433,16 +439,33 @@ class InplaceFileBackingStore(FileBackingStore):
if self.base in dirpath: continue
if self.STORE_NAME in dirname:
dirname.remove(self.STORE_NAME)
+
+ # other files and dirs to blacklist
+ if '.Trashes' in dirpath: continue
+
for fn in filenames:
+ # blacklist files
+ # ignore conventionally hidden files
+ if fn.startswith("."): continue
+
source = os.path.join(dirpath, fn)
relative = source[len(self.uri)+1:]
result, count = self.indexmanager.search(dict(filename=relative))
mime_type = gnomevfs.get_mime_type(source)
+ stat = os.stat(source)
+ ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
+ mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
+ title = os.path.splitext(os.path.split(source)[1])[0]
+ metadata = dict(filename=relative,
+ mime_type=mime_type,
+ ctime=ctime,
+ mtime=mtime,
+ title=title)
if not count:
# create a new record
- self.create(dict(filename=relative, mime_type=mime_type), source)
+ self.create(metadata, source)
else:
# update the object with the new content iif the
# checksum is different
@@ -450,14 +473,12 @@ class InplaceFileBackingStore(FileBackingStore):
# happen)
content = result.next()
uid = content.id
- # only if the checksum is different
- #checksum = self._checksum(source)
- #if checksum != content.checksum:
- self.update(uid, dict(filename=relative, mime_type=mime_type), source)
-
- if self.options.get('sync_mount', False):
- self.complete_indexing()
-
+ saved_mtime = content.get_property('mtime')
+ if mtime != saved_mtime:
+ self.update(uid, metadata, source)
+ self.indexmanager.flush()
+ return
+
# File Management API
def create(self, props, filelike):
# the file would have already been changed inplace
@@ -482,4 +503,13 @@ class InplaceFileBackingStore(FileBackingStore):
if path and os.path.exists(path):
os.unlink(path)
+ def stop(self):
+ if self.walker and self.walker.isAlive():
+ self.walker.join()
+ self.indexmanager.stop()
+
+ def complete_indexing(self):
+ if self.walker and self.walker.isAlive():
+ self.walker.join()
+ self.indexmanager.complete_indexing()
diff --git a/src/olpc/datastore/converter.py b/src/olpc/datastore/converter.py
index 8821061..e5baf47 100644
--- a/src/olpc/datastore/converter.py
+++ b/src/olpc/datastore/converter.py
@@ -75,15 +75,22 @@ class subprocessconverter(object):
try:
cmd = cmd.split()
- retcode = subprocess.call(cmd)
+ # the stderr capture here will hide glib error messages
+ # from converters which shouldn't be generating output anyway
+ retcode = subprocess.call(cmd, stderr=subprocess.PIPE)
if retcode: return None
return codecs.open(target, 'r', 'utf-8')
+ except UnicodeDecodeError:
+ # The data was an unknown type but couldn't be understood
+ # as text so we don't attempt to index it. This most
+ # likely means its just an unknown binary format.
+ return None
finally:
# we unlink the file as its already been opened for
# reading
if os.path.exists(target):
os.unlink(target)
-
+
class noop(object):
def verify(self): return True
def __call__(self, filename):
@@ -108,26 +115,23 @@ class Converter(object):
#encoding is passed its the known encoding of the
#contents. When None is passed the encoding is guessed which
#can result in unexpected or no output.
- ext = os.path.splitext(filename)[1]
if mimetype: mt = mimetype
else: mt = guess_mimetype(filename)
maintype, subtype = mt.split('/',1)
converter = self._converters.get(mt)
if not converter:
- converter = self._converters.get(ext)
- if not converter:
- converter = self._default
- # it was an image or an unknown application
- if maintype in ['image', 'application', 'audio', 'video'] or \
- subtype in ['x-trash', 'x-python-bytecode',]:
- converter = None
+ converter = self._default
+ # it was an image or an unknown application
+ if maintype in ['image', 'application', 'audio', 'video'] or \
+ subtype in ['x-trash', 'x-python-bytecode',]:
+ converter = None
+
if converter:
- try:
- return converter(filename)
+ try: return converter(filename)
except:
- logging.debug("Binary to Text failed: %s %s %s" %
- (ext, mt, filename), exc_info=sys.exc_info())
+ logging.debug("Binary to Text failed: %s %s" %
+ (mt, filename), exc_info=sys.exc_info())
return None
@@ -156,7 +160,7 @@ converter.registerConverter('.doc', doctotext)
converter.registerConverter('application/msword', doctotext)
# ODT
-odt2txt = subprocessconverter('/usr/local/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s')
+odt2txt = subprocessconverter('/usr/bin/odt2txt --encoding=UTF-8 --output=%(target)s %(source)s')
converter.registerConverter('.odt', odt2txt)
converter.registerConverter('application/vnd.oasis.opendocument.text', odt2txt)
diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py
index a6f8994..3710d68 100644
--- a/src/olpc/datastore/xapianindex.py
+++ b/src/olpc/datastore/xapianindex.py
@@ -17,7 +17,8 @@ from Queue import Queue, Empty
import logging
import re
import sys
-
+import time
+import thread
import threading
import warnings
@@ -91,10 +92,11 @@ class IndexManager(object):
self.read_index = secore.SearchConnection(repo)
self.flush()
-
# by default we start the indexer now
self.startIndexer()
+ assert self.indexer.isAlive()
+
def bind_to(self, backingstore):
# signal from backingstore that its our parent
self.backingstore = backingstore
@@ -147,7 +149,7 @@ class IndexManager(object):
if not filestuff:
# In this case we are done
return
-
+
self.queue.put((uid, vid, doc, operation, filestuff))
def indexThread(self):
@@ -160,8 +162,10 @@ class IndexManager(object):
# include timeout here to ease shutdown of the thread
# if this is a non-issue we can simply allow it to block
try:
- uid, vid, doc, operation, filestuff = self.queue.get(timeout=0.5)
+ data = self.queue.get(True, 0.025)
+ uid, vid, doc, operation, filestuff = data
except Empty:
+ #time.sleep(1.0)
continue
try: