diff options
Diffstat (limited to 'src/olpc/datastore/backingstore.py')
-rw-r--r-- | src/olpc/datastore/backingstore.py | 204 |
1 files changed, 179 insertions, 25 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 47e8214..886f34f 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -27,6 +27,11 @@ from olpc.datastore import utils # changing this pattern impacts _targetFile filename_attempt_pattern = re.compile('\(\d+\)$') +import logging +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' +logger = logging.getLogger(DS_LOG_CHANNEL) +logger.setLevel(logging.DEBUG) + class BackingStore(object): """Backing stores manage stable storage. We abstract out the management of file/blob storage through this class, as well as the @@ -110,6 +115,64 @@ class BackingStore(object): """ pass +import time +class AsyncCopy: + CHUNK_SIZE=65536 + + def __init__(self, src, dest, completion): + self.src = src + self.dest = dest + self.completion = completion + self.src_fp = -1 + self.dest_fp = -1 + self.written = 0 + self.size = 0 + + def _cleanup(self): + os.close(self.src_fp) + os.close(self.dest_fp) + + def _copy_block(self, user_data=None): + try: + data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) + count = os.write(self.dest_fp, data) + self.written += len(data) + + # error writing data to file? + if count < len(data): + logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest)) + self._cleanup() + self.completion(RuntimeError("Error writing data to destination file")) + return False + + # FIXME: emit progress here + + # done? + if len(data) < AsyncCopy.CHUNK_SIZE: + logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart)) + self._cleanup() + self.completion(None, self.dest) + return False + except Exception, err: + logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err)) + self._cleanup() + self.completion(err) + return False + + return True + + def start(self): + self.src_fp = os.open(self.src, os.O_RDONLY) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644) + + stat = os.fstat(self.src_fp) + self.size = stat[6] + + logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size)) + + self.tstart = time.time() + import gobject + sid = gobject.idle_add(self._copy_block) class FileBackingStore(BackingStore): """ A backing store that directs maps the storage of content @@ -338,7 +401,15 @@ class FileBackingStore(BackingStore): raise ValueError("Content for %s corrupt" % uid) return content - def _writeContent(self, uid, filelike, replace=True, target=None): + def _writeContent_complete(self, path, completion=None): + if completion is None: + return path + completion(None, path) + return None + + def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None, + completion=None): + """Returns: path of file in datastore (new path if it was copied/moved)""" content = None if target: path = target else: @@ -350,7 +421,19 @@ class FileBackingStore(BackingStore): if filelike.name != path: # protection on inplace stores - bin_copy.bin_copy(filelike.name, path) + if completion is None: + bin_copy.bin_copy(filelike.name, path) + return path + + if can_move: + bin_copy.bin_mv(filelike.name, path) + return self._writeContent_complete(path, completion) + + # Otherwise, async copy + aco = AsyncCopy(filelike.name, path, completion) + aco.start() + else: + return self._writeContent_complete(path, completion) def _checksum(self, filename): c = sha.sha() @@ -361,15 +444,42 @@ class FileBackingStore(BackingStore): return c.hexdigest() # File Management API - def create(self, props, filelike): - uid = self.indexmanager.index(props, filelike) + def _create_completion(self, uid, props, completion, exc=None, path=None): + if exc: + completion(exc) + return + try: + # Index the content this time + uid = self.indexmanager.index(props, path) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create_async(self, props, filelike, can_move=False, completion=None): + if completion is None: + raise RuntimeError("Completion must be valid for async create") + uid = self.indexmanager.index(props) + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + self._writeContent(uid, filelike, replace=False, can_move=can_move, + completion=lambda *args: self._create_completion(uid, props, completion, *args)) + else: + completion(None, uid) + + def create(self, props, filelike, can_move=False): if filelike: + uid = self.indexmanager.index(props, None) if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self._writeContent(uid, filelike, replace=False) - return uid + path = self._writeContent(uid, filelike, replace=False, can_move=can_move) + return self.indexmanager.index(props, path) + else: + return self.indexmanager.index(props) def get(self, uid, env=None, allowMissing=False, includeFile=False): content = self.indexmanager.get(uid) @@ -381,22 +491,49 @@ class FileBackingStore(BackingStore): fp = open(path, 'r') # now return a Content object from the model associated with # this file object - return self._mapContent(uid, fp, path, env) + content = self._mapContent(uid, fp, path, env) + if fp: + fp.close() + return content - def update(self, uid, props, filelike=None): + def _update_completion(self, uid, props, completion, exc=None, path=None): + if exc is not None: + completion(exc) + return + try: + self.indexmanager.index(props, path) + completion() + except Exception, exc: + completion(exc) + + def update_async(self, uid, props, filelike, can_move=False, completion=None): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + if completion is None: + raise RuntimeError("Completion must be valid for async update") if 'uid' not in props: props['uid'] = uid - - self.indexmanager.index(props, filelike) - filename = filelike + if filelike: if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self.set(uid, filelike) + self._writeContent(uid, filelike, can_move=can_move, + completion=lambda *args: self._update_completion(uid, props, completion, *args)) + else: + completion() - def set(self, uid, filelike): - self._writeContent(uid, filelike) + def update(self, uid, props, filelike=None, can_move=False): + if 'uid' not in props: props['uid'] = uid + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + path = self._writeContent(uid, filelike, can_move=can_move) + self.indexmanager.index(props, path) + else: + self.indexmanager.index(props) def delete(self, uid, allowMissing=True): self.indexmanager.delete(uid) @@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore): ## return open(targetpath, 'rw') # File Management API - def create(self, props, filelike): + def create_async(self, props, filelike, completion, can_move=False): + """Inplace backing store doesn't copy, so no need for async""" + if not filelike: + raise RuntimeError("Filelike must be valid for async create") + try: + uid = self.create(props, filelike, can_move) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create(self, props, filelike, can_move=False): # the file would have already been changed inplace # don't touch it proposed_name = None @@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - uid = self.indexmanager.index(props, filelike) - + uid = self.indexmanager.index(props, None) + path = filelike if proposed_name and not os.path.exists(proposed_name): - self._writeContent(uid, filelike, replace=False, target=proposed_name) - - return uid + path = self._writeContent(uid, filelike, replace=False, target=proposed_name) + return self.indexmanager.index(props, path) def get(self, uid, env=None, allowMissing=False): content = self.indexmanager.get(uid) if not content: raise KeyError(uid) return content - def update(self, uid, props, filelike=None): + def update_async(self, uid, props, filelike, completion, can_move=False): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + try: + self.update(uid, props, filelike, can_move) + completion() + except Exception, exc: + completion(exc) + + def update(self, uid, props, filelike=None, can_move=False): # the file would have already been changed inplace # don't touch it props['uid'] = uid @@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - self.indexmanager.index(props, filelike) - + path = filelike if proposed_name: - self._writeContent(uid, filelike, replace=True, target=proposed_name) - + path = self._writeContent(uid, filelike, replace=True, target=proposed_name) + self.indexmanager.index(props, path) def delete(self, uid): c = self.indexmanager.get(uid) |