Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 10:30:36 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2007-10-31 10:30:36 (GMT)
commit8f8bbe73fdc61ce3c1716bfc57a21eef7cb2d7e6 (patch)
tree19212154e0cacabc3967418f77cf29fff14f7ceb
parent9e2b0439160a2e24ec451fb97403b4b9b642535a (diff)
#4083: Remove the active loop and check usb sticks on idle.
-rw-r--r--NEWS2
-rwxr-xr-xbin/datastore-service25
-rw-r--r--src/olpc/datastore/backingstore.py149
3 files changed, 82 insertions, 94 deletions
diff --git a/NEWS b/NEWS
index 65954b6..756bf1e 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,5 @@
+* #4083: Remove the active loop and check usb sticks on idle. (tomeu)
+
Snapshot 5051fc3a4b
Snapshot 26a372063c
diff --git a/bin/datastore-service b/bin/datastore-service
index 140f494..8f07458 100755
--- a/bin/datastore-service
+++ b/bin/datastore-service
@@ -46,12 +46,13 @@ logging.info("Starting Datastore %s" % (repo_dir))
mainloop = gobject.MainLoop()
def handle_disconnect():
- global connected
- connected = False
+ global mainloop
+ mainloop.quit()
+ logging.debug("Datastore disconnected from the bus.")
def handle_shutdown(signum, frame):
- global connected
- connected = False
+ global mainloop
+ mainloop.quit()
raise SystemExit("Shutting down on signal %s" % signum)
bus.set_exit_on_disconnect(False)
@@ -63,29 +64,17 @@ signal.signal(signal.SIGHUP, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
def main():
- # XXX: The context/sleep loop is a work around for what seems
- # to be the mainloop blocking in such a way that the indexer
- # thread doesn't run until new dbus messages come in...
- # I intend to track this down post trial-2
- # really awful
- import time
- context = mainloop.get_context()
-
if '-m' in sys.argv:
# mount automatically for local testing
ds.mount(repo_dir)
ds.complete_indexing()
try:
- while connected:
- context.iteration(False)
- time.sleep(0.0025)
-
+ mainloop.run()
except KeyboardInterrupt:
logging.info("DataStore shutdown by user")
except:
- logging.debug("Datastore shutdown with error",
- exc_info=sys.exc_info())
+ logging.error("Datastore shutdown with error", exc_info=sys.exc_info())
main()
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index d37c553..3bff2fe 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -22,6 +22,7 @@ import threading
import dbus
import xapian
+import gobject
from olpc.datastore.xapianindex import IndexManager
from olpc.datastore import bin_copy
@@ -174,7 +175,6 @@ class AsyncCopy:
logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
self.tstart = time.time()
- import gobject
sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
@@ -612,7 +612,7 @@ class InplaceFileBackingStore(FileBackingStore):
super(InplaceFileBackingStore, self).__init__(uri, **kwargs)
# use the original uri
self.uri = uri
- self.walker = None
+ self._walk_source = None
@staticmethod
def parse(uri):
@@ -646,76 +646,74 @@ class InplaceFileBackingStore(FileBackingStore):
# now map/update the existing data into the indexes
# but do it async
- self.walker = threading.Thread(target=self._walk)
- self._runWalker = True
- self.walker.setDaemon(True)
- self.walker.start()
-
- def _walk(self):
- # XXX: a version that checked xattr for uid would be simple
- # and faster
- # scan the uri for all non self.base files and update their
- # records in the db
+ files_to_check = []
for dirpath, dirname, filenames in os.walk(self.uri):
- try:
- # see if there is an entry for the filename
- if self.base in dirpath: continue
- if self.STORE_NAME in dirname:
- dirname.remove(self.STORE_NAME)
-
- # blacklist all the hidden directories
- if '/.' in dirpath: continue
-
- for fn in filenames:
- try:
- # give the thread a chance to exit
- if not self._runWalker: break
- # blacklist files
- # ignore conventionally hidden files
- if fn.startswith("."): continue
-
- source = os.path.join(dirpath, fn)
- relative = source[len(self.uri)+1:]
-
- result, count = self.indexmanager.search(dict(filename=relative))
- mime_type = gnomevfs.get_mime_type(source)
- stat = os.stat(source)
- ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
- mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
- title = os.path.splitext(os.path.split(source)[1])[0]
- metadata = dict(filename=relative,
- mime_type=mime_type,
- ctime=ctime,
- mtime=mtime,
- title=title)
- if not count:
- # create a new record
- self.create(metadata, source)
- else:
- # update the object with the new content iif the
- # checksum is different
- # XXX: what if there is more than one? (shouldn't
- # happen)
-
- # FIXME This is throwing away all the entry metadata.
- # Disabled for trial-3. We are not doing indexing
- # anyway so it would just update the mtime which is
- # not that useful. Also the journal is currently
- # setting the mime type before saving the file making
- # the mtime check useless.
- #
- # content = result.next()
- # uid = content.id
- # saved_mtime = content.get_property('mtime')
- # if mtime != saved_mtime:
- # self.update(uid, metadata, source)
- pass
- except Exception, e:
- logging.exception('Error while processing %r: %r' % (fn, e))
- except Exception, e:
- logging.exception('Error while indexing mount point %r: %r' % (self.uri, e))
- self.indexmanager.flush()
- return
+ if self.base in dirpath: continue
+ if self.STORE_NAME in dirname:
+ dirname.remove(self.STORE_NAME)
+
+ # blacklist all the hidden directories
+ if '/.' in dirpath: continue
+
+ for fn in filenames:
+ # ignore conventionally hidden files
+ if fn.startswith("."):
+ continue
+ files_to_check.append((dirpath, fn))
+
+ self._walk_source = gobject.idle_add(self._walk, files_to_check)
+
+ def _walk(self, files_to_check):
+ dirpath, fn = files_to_check.pop()
+ logging.debug('InplaceFileBackingStore._walk(): %r' % fn)
+ try:
+ source = os.path.join(dirpath, fn)
+ relative = source[len(self.uri)+1:]
+
+ result, count = self.indexmanager.search(dict(filename=relative))
+ mime_type = gnomevfs.get_mime_type(source)
+ stat = os.stat(source)
+ ctime = datetime.fromtimestamp(stat.st_ctime).isoformat()
+ mtime = datetime.fromtimestamp(stat.st_mtime).isoformat()
+ title = os.path.splitext(os.path.split(source)[1])[0]
+ metadata = dict(filename=relative,
+ mime_type=mime_type,
+ ctime=ctime,
+ mtime=mtime,
+ title=title)
+ if not count:
+ # create a new record
+ self.create(metadata, source)
+ else:
+ # update the object with the new content iif the
+ # checksum is different
+ # XXX: what if there is more than one? (shouldn't
+ # happen)
+
+ # FIXME This is throwing away all the entry metadata.
+ # Disabled for trial-3. We are not doing indexing
+ # anyway so it would just update the mtime which is
+ # not that useful. Also the journal is currently
+ # setting the mime type before saving the file making
+ # the mtime check useless.
+ #
+ # content = result.next()
+ # uid = content.id
+ # saved_mtime = content.get_property('mtime')
+ # if mtime != saved_mtime:
+ # self.update(uid, metadata, source)
+ pass
+
+ self.indexmanager.flush()
+
+ except Exception, e:
+ logging.exception('Error while processing %r: %r' % (fn, e))
+
+ if files_to_check:
+ return True
+ else:
+ self._walk_source = None
+ return False
def _translatePath(self, uid):
try: content = self.indexmanager.get(uid)
@@ -849,12 +847,11 @@ class InplaceFileBackingStore(FileBackingStore):
os.unlink(path)
def stop(self):
- if self.walker and self.walker.isAlive():
- # XXX: just force the unmount, flush the index queue
- self._runWalker = False
+ if self._walk_source is not None:
+ gobject.source_remove(self._walk_source)
self.indexmanager.stop(force=True)
def complete_indexing(self):
- if self.walker and self.walker.isAlive():
- self.walker.join()
+ # TODO: Perhaps we should move the inplace indexing to be sync here?
self.indexmanager.complete_indexing()
+