Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 17:43:12 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 17:43:12 (GMT)
commitf0753fadc877dc8408db6c7d5d54d9ffbc171b75 (patch)
treeebfc7cf04d08b7f9604da41da74ecb63c32c33d2
parent6bb99669eb0606c56ee195473006e219cedb9e81 (diff)
Revert "#4083: Remove the active loop and check usb sticks on idle."
No need to move usb stick checking to the idle event. We can keep using a background thread if we tell glib and dbus to initialize threads. This reverts commit 8f8bbe73fdc61ce3c1716bfc57a21eef7cb2d7e6. Conflicts: NEWS Signed-off-by: Tomeu Vizoso <tomeu@tomeuvizoso.net>
-rw-r--r--NEWS1
-rwxr-xr-xbin/datastore-service25
-rw-r--r--src/olpc/datastore/backingstore.py149
3 files changed, 94 insertions, 81 deletions
diff --git a/NEWS b/NEWS
index a519018..098a22f 100644
--- a/NEWS
+++ b/NEWS
@@ -1,5 +1,4 @@
* #4182: Flush every 20 changes or 1 minute after the last unflushed change. (tomeu)
-* #4083: Remove the active loop and check usb sticks on idle. (tomeu)
Snapshot 5051fc3a4b
diff --git a/bin/datastore-service b/bin/datastore-service
index 8f07458..140f494 100755
--- a/bin/datastore-service
+++ b/bin/datastore-service
@@ -46,13 +46,12 @@ logging.info("Starting Datastore %s" % (repo_dir))
mainloop = gobject.MainLoop()
def handle_disconnect():
- global mainloop
- mainloop.quit()
- logging.debug("Datastore disconnected from the bus.")
+ global connected
+ connected = False
def handle_shutdown(signum, frame):
- global mainloop
- mainloop.quit()
+ global connected
+ connected = False
raise SystemExit("Shutting down on signal %s" % signum)
bus.set_exit_on_disconnect(False)
@@ -64,17 +63,29 @@ signal.signal(signal.SIGHUP, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
def main():
+ # XXX: The context/sleep loop is a work around for what seems
+ # to be the mainloop blocking in such a way that the indexer
+ # thread doesn't run until new dbus messages come in...
+ # I intend to track this down post trial-2
+ # really awful
+ import time
+ context = mainloop.get_context()
+
if '-m' in sys.argv:
# mount automatically for local testing
ds.mount(repo_dir)
ds.complete_indexing()
try:
- mainloop.run()
+ while connected:
+ context.iteration(False)
+ time.sleep(0.0025)
+
except KeyboardInterrupt:
logging.info("DataStore shutdown by user")
except:
- logging.error("Datastore shutdown with error", exc_info=sys.exc_info())
+ logging.debug("Datastore shutdown with error",
+ exc_info=sys.exc_info())
main()
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 3bff2fe..d37c553 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -22,7 +22,6 @@ import threading
import dbus
import xapian
-import gobject
from olpc.datastore.xapianindex import IndexManager
from olpc.datastore import bin_copy
@@ -175,6 +174,7 @@ class AsyncCopy:
logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
self.tstart = time.time()
+ import gobject
sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
@@ -612,7 +612,7 @@ class InplaceFileBackingStore(FileBackingStore):
super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
# use the original uri
self.uri = uri
- self._walk_source = None
+ self.walker = None
@staticmethod
def parse(uri):
@@ -646,74 +646,76 @@ class InplaceFileBackingStore(FileBackingStore):
# now map/update the existing data into the indexes
# but do it async
- files_to_check = []
+ self.walker = threading.Thread(target=self._walk)
+ self._runWalker = True
+ self.walker.setDaemon(True)
+ self.walker.start()
+
+ def _walk(self):
+ # XXX: a version that checked xattr for uid would be simple
+ # and faster
+ # scan the uri for all non self.base files and update their
+ # records in the db
for dirpath, dirname, filenames in os.walk(self.uri):
- if self.base in dirpath: continue
- if self.STORE_NAME in dirname:
- dirname.remove(self.STORE_NAME)
-
- # blacklist all the hidden directories
- if '/.' in dirpath: continue
-
- for fn in filenames:
- # ignore conventionally hidden files
- if fn.startswith("."):
- continue
- files_to_check.append((dirpath, fn))
-
- self._walk_source = gobject.idle_add(self._walk, files_to_check)
-
- def _walk(self, files_to_check):
- dirpath, fn = files_to_check.pop()
- logging.debug('InplaceFileBackingStore._walk(): %r' % fn)
- try:
- source = os.path.join(dirpath, fn)
- relative = source[len(self.uri)+1:]
-
- result, count = self.indexmanager.search(dict(filename=relative))
- mime_type = gnomevfs.get_mime_type(source)
- stat = os.stat(source)
- ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
- mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
- title = os.path.splitext(os.path.split(source)[1])[0]
- metadata = dict(filename=relative,
- mime_type=mime_type,
- ctime=ctime,
- mtime=mtime,
- title=title)
- if not count:
- # create a new record
- self.create(metadata, source)
- else:
- # update the object with the new content iif the
- # checksum is different
- # XXX: what if there is more than one? (shouldn't
- # happen)
-
- # FIXME This is throwing away all the entry metadata.
- # Disabled for trial-3. We are not doing indexing
- # anyway so it would just update the mtime which is
- # not that useful. Also the journal is currently
- # setting the mime type before saving the file making
- # the mtime check useless.
- #
- # content = result.next()
- # uid = content.id
- # saved_mtime = content.get_property('mtime')
- # if mtime != saved_mtime:
- # self.update(uid, metadata, source)
- pass
-
- self.indexmanager.flush()
-
- except Exception, e:
- logging.exception('Error while processing %r: %r' % (fn, e))
-
- if files_to_check:
- return True
- else:
- self._walk_source = None
- return False
+ try:
+ # see if there is an entry for the filename
+ if self.base in dirpath: continue
+ if self.STORE_NAME in dirname:
+ dirname.remove(self.STORE_NAME)
+
+ # blacklist all the hidden directories
+ if '/.' in dirpath: continue
+
+ for fn in filenames:
+ try:
+ # give the thread a chance to exit
+ if not self._runWalker: break
+ # blacklist files
+ # ignore conventionally hidden files
+ if fn.startswith("."): continue
+
+ source = os.path.join(dirpath, fn)
+ relative = source[len(self.uri)+1:]
+
+ result, count = self.indexmanager.search(dict(filename=relative))
+ mime_type = gnomevfs.get_mime_type(source)
+ stat = os.stat(source)
+ ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
+ mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
+ title = os.path.splitext(os.path.split(source)[1])[0]
+ metadata = dict(filename=relative,
+ mime_type=mime_type,
+ ctime=ctime,
+ mtime=mtime,
+ title=title)
+ if not count:
+ # create a new record
+ self.create(metadata, source)
+ else:
+ # update the object with the new content iif the
+ # checksum is different
+ # XXX: what if there is more than one? (shouldn't
+ # happen)
+
+ # FIXME This is throwing away all the entry metadata.
+ # Disabled for trial-3. We are not doing indexing
+ # anyway so it would just update the mtime which is
+ # not that useful. Also the journal is currently
+ # setting the mime type before saving the file making
+ # the mtime check useless.
+ #
+ # content = result.next()
+ # uid = content.id
+ # saved_mtime = content.get_property('mtime')
+ # if mtime != saved_mtime:
+ # self.update(uid, metadata, source)
+ pass
+ except Exception, e:
+ logging.exception('Error while processing %r: %r' % (fn, e))
+ except Exception, e:
+ logging.exception('Error while indexing mount point %r: %r' % (self.uri, e))
+ self.indexmanager.flush()
+ return
def _translatePath(self, uid):
try: content = self.indexmanager.get(uid)
@@ -847,11 +849,12 @@ class InplaceFileBackingStore(FileBackingStore):
os.unlink(path)
def stop(self):
- if self._walk_source is not None:
- gobject.source_remove(self._walk_source)
+ if self.walker and self.walker.isAlive():
+ # XXX: just force the unmount, flush the index queue
+ self._runWalker = False
self.indexmanager.stop(force=True)
def complete_indexing(self):
- # TODO: Perhaps we should move the inplace indexing to be sync here?
+ if self.walker and self.walker.isAlive():
+ self.walker.join()
self.indexmanager.complete_indexing()
-