diff options
author | Benjamin Saller <bcsaller@objectrealms.net> | 2007-08-26 16:46:56 (GMT) |
---|---|---|
committer | Benjamin Saller <bcsaller@objectrealms.net> | 2007-08-26 16:46:56 (GMT) |
commit | 57e90f572b41d40f9601c50e2d7e63bc47a6279a (patch) | |
tree | c76c42019e94ac20378b6dba663272dce805c969 /src | |
parent | f05bfc05d458fe0897c7bb8febab314fcd38e1c1 (diff) | |
parent | f46c8674cd85aef7dc38d7b70a3de7225a884d51 (diff) |
Merge branch 'master' of git+ssh://dev.laptop.org/git/projects/datastore into version_prototype
Conflicts:
src/olpc/datastore/backingstore.py
Diffstat (limited to 'src')
-rw-r--r-- | src/olpc/datastore/backingstore.py | 32 | ||||
-rw-r--r-- | src/olpc/datastore/xapianindex.py | 16 |
2 files changed, 42 insertions, 6 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 87b1ef1..0d04b4a 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -353,7 +353,7 @@ class FileBackingStore(BackingStore): if replace is False and os.path.exists(path): raise KeyError("objects with path:%s for uid:%s exists" %( - path, uid)) + path, uid)) bin_copy.bin_copy(filelike, path) @@ -441,6 +441,7 @@ class InplaceFileBackingStore(FileBackingStore): # now map/update the existing data into the indexes # but do it async self.walker = threading.Thread(target=self._walk) + self._runWalker = True self.walker.setDaemon(True) self.walker.start() @@ -460,6 +461,8 @@ class InplaceFileBackingStore(FileBackingStore): for fn in filenames: + # give the thread a chance to exit + if not self._runWalker: break # blacklist files # ignore conventionally hidden files if fn.startswith("."): continue @@ -534,7 +537,29 @@ class InplaceFileBackingStore(FileBackingStore): # the file would have already been changed inplace # don't touch it props['uid'] = uid + + proposed_name = None + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + # usually with USB drives and the like the file we are + # indexing is already on it, however in the case of moving + # files to these devices we need to detect this case and + # place the file + proposed_name = props.get('filename', None) + if not proposed_name: + proposed_name = os.path.split(filelike.name)[1] + # record the name before qualifying it to the store + props['filename'] = proposed_name + proposed_name = os.path.join(self.uri, proposed_name) + self.indexmanager.index(props, filelike) + + if proposed_name: + self._writeContent(uid, filelike, replace=True, target=proposed_name) + def delete(self, uid): c = self.indexmanager.get(uid) @@ -548,8 +573,9 @@ class InplaceFileBackingStore(FileBackingStore): def stop(self): if self.walker and self.walker.isAlive(): - self.walker.join() - self.indexmanager.stop() + # XXX: just force the unmount, flush the index queue + self._runWalker = False + self.indexmanager.stop(force=True) def complete_indexing(self): if self.walker and self.walker.isAlive(): diff --git a/src/olpc/datastore/xapianindex.py b/src/olpc/datastore/xapianindex.py index 4212ad3..ceb7d56 100644 --- a/src/olpc/datastore/xapianindex.py +++ b/src/olpc/datastore/xapianindex.py @@ -14,6 +14,7 @@ __license__ = 'The GNU Public License V2+' from Queue import Queue, Empty +import gc import logging import os import re @@ -104,11 +105,19 @@ class IndexManager(object): self.backingstore = backingstore - def stop(self): - self.stopIndexer() + def stop(self, force=False): + self.stopIndexer(force) self.write_index.close() self.read_index.close() - + # XXX: work around for xapian not having close() this will + # change in the future in the meantime we delete the + # references to the indexers and then force the gc() to run + # which should inturn trigger the C++ destructor which forces + # the database shut. + self.write_index = None + self.read_index = None + gc.collect() + # Index thread management def startIndexer(self): self.indexer_running = True @@ -120,6 +129,7 @@ class IndexManager(object): if not self.indexer_running: return if not force: self.queue.join() self.indexer_running = False + # should terminate after the current task self.indexer.join() # flow control |