diff options
author | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2007-10-31 17:43:12 (GMT) |
---|---|---|
committer | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2007-10-31 17:43:12 (GMT) |
commit | f0753fadc877dc8408db6c7d5d54d9ffbc171b75 (patch) | |
tree | ebfc7cf04d08b7f9604da41da74ecb63c32c33d2 | |
parent | 6bb99669eb0606c56ee195473006e219cedb9e81 (diff) |
Revert "#4083: Remove the active loop and check usb sticks on idle."
No need to move usb stick checking to the idle event. We can
keep using a background thread if we tell glib and dbus to
initialize threads.
This reverts commit 8f8bbe73fdc61ce3c1716bfc57a21eef7cb2d7e6.
Conflicts:
NEWS
Signed-off-by: Tomeu Vizoso <tomeu@tomeuvizoso.net>
-rw-r--r-- | NEWS | 1 | ||||
-rwxr-xr-x | bin/datastore-service | 25 | ||||
-rw-r--r-- | src/olpc/datastore/backingstore.py | 149 |
3 files changed, 94 insertions, 81 deletions
@@ -1,5 +1,4 @@ * #4182: Flush every 20 changes or 1 minute after the last unflushed change. (tomeu) -* #4083: Remove the active loop and check usb sticks on idle. (tomeu) Snapshot 5051fc3a4b diff --git a/bin/datastore-service b/bin/datastore-service index 8f07458..140f494 100755 --- a/bin/datastore-service +++ b/bin/datastore-service @@ -46,13 +46,12 @@ logging.info("Starting Datastore %s" % (repo_dir)) mainloop = gobject.MainLoop() def handle_disconnect(): - global mainloop - mainloop.quit() - logging.debug("Datastore disconnected from the bus.") + global connected + connected = False def handle_shutdown(signum, frame): - global mainloop - mainloop.quit() + global connected + connected = False raise SystemExit("Shutting down on signal %s" % signum) bus.set_exit_on_disconnect(False) @@ -64,17 +63,29 @@ signal.signal(signal.SIGHUP, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) def main(): + # XXX: The context/sleep loop is a work around for what seems + # to be the mainloop blocking in such a way that the indexer + # thread doesn't run until new dbus messages come in... + # I intend to track this down post trial-2 + # really awful + import time + context = mainloop.get_context() + if '-m' in sys.argv: # mount automatically for local testing ds.mount(repo_dir) ds.complete_indexing() try: - mainloop.run() + while connected: + context.iteration(False) + time.sleep(0.0025) + except KeyboardInterrupt: logging.info("DataStore shutdown by user") except: - logging.error("Datastore shutdown with error", exc_info=sys.exc_info()) + logging.debug("Datastore shutdown with error", + exc_info=sys.exc_info()) main() diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 3bff2fe..d37c553 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -22,7 +22,6 @@ import threading import dbus import xapian -import gobject from olpc.datastore.xapianindex import IndexManager from olpc.datastore import bin_copy @@ -175,6 +174,7 @@ class AsyncCopy: logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size)) self.tstart = time.time() + import gobject sid = gobject.idle_add(self._copy_block) class FileBackingStore(BackingStore): @@ -612,7 +612,7 @@ class InplaceFileBackingStore(FileBackingStore): super(InplaceFileBackingStore, self).__init__(uri, **kwargs) # use the original uri self.uri = uri - self._walk_source = None + self.walker = None @staticmethod def parse(uri): @@ -646,74 +646,76 @@ class InplaceFileBackingStore(FileBackingStore): # now map/update the existing data into the indexes # but do it async - files_to_check = [] + self.walker = threading.Thread(target=self._walk) + self._runWalker = True + self.walker.setDaemon(True) + self.walker.start() + + def _walk(self): + # XXX: a version that checked xattr for uid would be simple + # and faster + # scan the uri for all non self.base files and update their + # records in the db for dirpath, dirname, filenames in os.walk(self.uri): - if self.base in dirpath: continue - if self.STORE_NAME in dirname: - dirname.remove(self.STORE_NAME) - - # blacklist all the hidden directories - if '/.' in dirpath: continue - - for fn in filenames: - # ignore conventionally hidden files - if fn.startswith("."): - continue - files_to_check.append((dirpath, fn)) - - self._walk_source = gobject.idle_add(self._walk, files_to_check) - - def _walk(self, files_to_check): - dirpath, fn = files_to_check.pop() - logging.debug('InplaceFileBackingStore._walk(): %r' % fn) - try: - source = os.path.join(dirpath, fn) - relative = source[len(self.uri)+1:] - - result, count = self.indexmanager.search(dict(filename=relative)) - mime_type = gnomevfs.get_mime_type(source) - stat = os.stat(source) - ctime = datetime.fromtimestamp(stat.st_ctime).isoformat() - mtime = datetime.fromtimestamp(stat.st_mtime).isoformat() - title = os.path.splitext(os.path.split(source)[1])[0] - metadata = dict(filename=relative, - mime_type=mime_type, - ctime=ctime, - mtime=mtime, - title=title) - if not count: - # create a new record - self.create(metadata, source) - else: - # update the object with the new content iif the - # checksum is different - # XXX: what if there is more than one? (shouldn't - # happen) - - # FIXME This is throwing away all the entry metadata. - # Disabled for trial-3. We are not doing indexing - # anyway so it would just update the mtime which is - # not that useful. Also the journal is currently - # setting the mime type before saving the file making - # the mtime check useless. - # - # content = result.next() - # uid = content.id - # saved_mtime = content.get_property('mtime') - # if mtime != saved_mtime: - # self.update(uid, metadata, source) - pass - - self.indexmanager.flush() - - except Exception, e: - logging.exception('Error while processing %r: %r' % (fn, e)) - - if files_to_check: - return True - else: - self._walk_source = None - return False + try: + # see if there is an entry for the filename + if self.base in dirpath: continue + if self.STORE_NAME in dirname: + dirname.remove(self.STORE_NAME) + + # blacklist all the hidden directories + if '/.' in dirpath: continue + + for fn in filenames: + try: + # give the thread a chance to exit + if not self._runWalker: break + # blacklist files + # ignore conventionally hidden files + if fn.startswith("."): continue + + source = os.path.join(dirpath, fn) + relative = source[len(self.uri)+1:] + + result, count = self.indexmanager.search(dict(filename=relative)) + mime_type = gnomevfs.get_mime_type(source) + stat = os.stat(source) + ctime = datetime.fromtimestamp(stat.st_ctime).isoformat() + mtime = datetime.fromtimestamp(stat.st_mtime).isoformat() + title = os.path.splitext(os.path.split(source)[1])[0] + metadata = dict(filename=relative, + mime_type=mime_type, + ctime=ctime, + mtime=mtime, + title=title) + if not count: + # create a new record + self.create(metadata, source) + else: + # update the object with the new content iif the + # checksum is different + # XXX: what if there is more than one? (shouldn't + # happen) + + # FIXME This is throwing away all the entry metadata. + # Disabled for trial-3. We are not doing indexing + # anyway so it would just update the mtime which is + # not that useful. Also the journal is currently + # setting the mime type before saving the file making + # the mtime check useless. + # + # content = result.next() + # uid = content.id + # saved_mtime = content.get_property('mtime') + # if mtime != saved_mtime: + # self.update(uid, metadata, source) + pass + except Exception, e: + logging.exception('Error while processing %r: %r' % (fn, e)) + except Exception, e: + logging.exception('Error while indexing mount point %r: %r' % (self.uri, e)) + self.indexmanager.flush() + return def _translatePath(self, uid): try: content = self.indexmanager.get(uid) @@ -847,11 +849,12 @@ class InplaceFileBackingStore(FileBackingStore): os.unlink(path) def stop(self): - if self._walk_source is not None: - gobject.source_remove(self._walk_source) + if self.walker and self.walker.isAlive(): + # XXX: just force the unmount, flush the index queue + self._runWalker = False self.indexmanager.stop(force=True) def complete_indexing(self): - # TODO: Perhaps we should move the inplace indexing to be sync here? + if self.walker and self.walker.isAlive(): + self.walker.join() self.indexmanager.complete_indexing() - |