diff options
-rw-r--r-- | NEWS | 4 | ||||
-rw-r--r-- | src/olpc/datastore/backingstore.py | 204 | ||||
-rw-r--r-- | src/olpc/datastore/bin_copy.py | 7 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 49 |
4 files changed, 223 insertions, 41 deletions
@@ -1,3 +1,7 @@ +* On update() and create(), copy asynchronously where possible, and + add new "transfer_ownership" argument to update() and create() that + allow the datastore to move the object rather than copy it (dcbw) + Snapshot a111996299 Snapshot 356b35a278 diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 47e8214..886f34f 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -27,6 +27,11 @@ from olpc.datastore import utils # changing this pattern impacts _targetFile filename_attempt_pattern = re.compile('\(\d+\)$') +import logging +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' +logger = logging.getLogger(DS_LOG_CHANNEL) +logger.setLevel(logging.DEBUG) + class BackingStore(object): """Backing stores manage stable storage. We abstract out the management of file/blob storage through this class, as well as the @@ -110,6 +115,64 @@ class BackingStore(object): """ pass +import time +class AsyncCopy: + CHUNK_SIZE=65536 + + def __init__(self, src, dest, completion): + self.src = src + self.dest = dest + self.completion = completion + self.src_fp = -1 + self.dest_fp = -1 + self.written = 0 + self.size = 0 + + def _cleanup(self): + os.close(self.src_fp) + os.close(self.dest_fp) + + def _copy_block(self, user_data=None): + try: + data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) + count = os.write(self.dest_fp, data) + self.written += len(data) + + # error writing data to file? + if count < len(data): + logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest)) + self._cleanup() + self.completion(RuntimeError("Error writing data to destination file")) + return False + + # FIXME: emit progress here + + # done? + if len(data) < AsyncCopy.CHUNK_SIZE: + logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart)) + self._cleanup() + self.completion(None, self.dest) + return False + except Exception, err: + logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err)) + self._cleanup() + self.completion(err) + return False + + return True + + def start(self): + self.src_fp = os.open(self.src, os.O_RDONLY) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644) + + stat = os.fstat(self.src_fp) + self.size = stat[6] + + logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size)) + + self.tstart = time.time() + import gobject + sid = gobject.idle_add(self._copy_block) class FileBackingStore(BackingStore): """ A backing store that directs maps the storage of content @@ -338,7 +401,15 @@ class FileBackingStore(BackingStore): raise ValueError("Content for %s corrupt" % uid) return content - def _writeContent(self, uid, filelike, replace=True, target=None): + def _writeContent_complete(self, path, completion=None): + if completion is None: + return path + completion(None, path) + return None + + def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None, + completion=None): + """Returns: path of file in datastore (new path if it was copied/moved)""" content = None if target: path = target else: @@ -350,7 +421,19 @@ class FileBackingStore(BackingStore): if filelike.name != path: # protection on inplace stores - bin_copy.bin_copy(filelike.name, path) + if completion is None: + bin_copy.bin_copy(filelike.name, path) + return path + + if can_move: + bin_copy.bin_mv(filelike.name, path) + return self._writeContent_complete(path, completion) + + # Otherwise, async copy + aco = AsyncCopy(filelike.name, path, completion) + aco.start() + else: + return self._writeContent_complete(path, completion) def _checksum(self, filename): c = sha.sha() @@ -361,15 +444,42 @@ class FileBackingStore(BackingStore): return c.hexdigest() # File Management API - def create(self, props, filelike): - uid = self.indexmanager.index(props, filelike) + def _create_completion(self, uid, props, completion, exc=None, path=None): + if exc: + completion(exc) + return + try: + # Index the content this time + uid = self.indexmanager.index(props, path) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create_async(self, props, filelike, can_move=False, completion=None): + if completion is None: + raise RuntimeError("Completion must be valid for async create") + uid = self.indexmanager.index(props) + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + self._writeContent(uid, filelike, replace=False, can_move=can_move, + completion=lambda *args: self._create_completion(uid, props, completion, *args)) + else: + completion(None, uid) + + def create(self, props, filelike, can_move=False): if filelike: + uid = self.indexmanager.index(props, None) if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self._writeContent(uid, filelike, replace=False) - return uid + path = self._writeContent(uid, filelike, replace=False, can_move=can_move) + return self.indexmanager.index(props, path) + else: + return self.indexmanager.index(props) def get(self, uid, env=None, allowMissing=False, includeFile=False): content = self.indexmanager.get(uid) @@ -381,22 +491,49 @@ class FileBackingStore(BackingStore): fp = open(path, 'r') # now return a Content object from the model associated with # this file object - return self._mapContent(uid, fp, path, env) + content = self._mapContent(uid, fp, path, env) + if fp: + fp.close() + return content - def update(self, uid, props, filelike=None): + def _update_completion(self, uid, props, completion, exc=None, path=None): + if exc is not None: + completion(exc) + return + try: + self.indexmanager.index(props, path) + completion() + except Exception, exc: + completion(exc) + + def update_async(self, uid, props, filelike, can_move=False, completion=None): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + if completion is None: + raise RuntimeError("Completion must be valid for async update") if 'uid' not in props: props['uid'] = uid - - self.indexmanager.index(props, filelike) - filename = filelike + if filelike: if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self.set(uid, filelike) + self._writeContent(uid, filelike, can_move=can_move, + completion=lambda *args: self._update_completion(uid, props, completion, *args)) + else: + completion() - def set(self, uid, filelike): - self._writeContent(uid, filelike) + def update(self, uid, props, filelike=None, can_move=False): + if 'uid' not in props: props['uid'] = uid + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + path = self._writeContent(uid, filelike, can_move=can_move) + self.indexmanager.index(props, path) + else: + self.indexmanager.index(props) def delete(self, uid, allowMissing=True): self.indexmanager.delete(uid) @@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore): ## return open(targetpath, 'rw') # File Management API - def create(self, props, filelike): + def create_async(self, props, filelike, completion, can_move=False): + """Inplace backing store doesn't copy, so no need for async""" + if not filelike: + raise RuntimeError("Filelike must be valid for async create") + try: + uid = self.create(props, filelike, can_move) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create(self, props, filelike, can_move=False): # the file would have already been changed inplace # don't touch it proposed_name = None @@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - uid = self.indexmanager.index(props, filelike) - + uid = self.indexmanager.index(props, None) + path = filelike if proposed_name and not os.path.exists(proposed_name): - self._writeContent(uid, filelike, replace=False, target=proposed_name) - - return uid + path = self._writeContent(uid, filelike, replace=False, target=proposed_name) + return self.indexmanager.index(props, path) def get(self, uid, env=None, allowMissing=False): content = self.indexmanager.get(uid) if not content: raise KeyError(uid) return content - def update(self, uid, props, filelike=None): + def update_async(self, uid, props, filelike, completion, can_move=False): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + try: + self.update(uid, props, filelike, can_move) + completion() + except Exception, exc: + completion(exc) + + def update(self, uid, props, filelike=None, can_move=False): # the file would have already been changed inplace # don't touch it props['uid'] = uid @@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - self.indexmanager.index(props, filelike) - + path = filelike if proposed_name: - self._writeContent(uid, filelike, replace=True, target=proposed_name) - + path = self._writeContent(uid, filelike, replace=True, target=proposed_name) + self.indexmanager.index(props, path) def delete(self, uid): c = self.indexmanager.get(uid) diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py index 1be1b6b..df28052 100644 --- a/src/olpc/datastore/bin_copy.py +++ b/src/olpc/datastore/bin_copy.py @@ -9,6 +9,13 @@ def bin_copy(src, dest, mode=0600): else: os.chmod(dest, mode) +def bin_mv(src, dest, mode=0600): + try: + subprocess.check_call(['/bin/mv', src, dest]) + except subprocess.CalledProcessError: + raise OSError("Move failed %s %s" % (src, dest)) + else: + os.chmod(dest, mode) if __name__ == "__main__": import sys diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 19342e2..f37bff3 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -183,12 +183,23 @@ class DataStore(dbus.service.Object): mp = self.root return mp + def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None): + logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid)) + if exc is not None: + async_err_cb(exc) + return + + self.Created(uid) + logger.debug("created %s" % uid) + async_cb(uid) + # PUBLIC API #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}s', - out_signature='s') - def create(self, props, filelike=None): + in_signature='a{sv}sb', + out_signature='s', + async_callbacks=('async_cb', 'async_err_cb')) + def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None): """create a new entry in the datastore. If a file is passed it will be consumed by the datastore. Because the repository has a checkin/checkout model this will create a copy of the file @@ -200,11 +211,8 @@ class DataStore(dbus.service.Object): over this process can come at a later time. """ mp = self._resolveMountpoint(props) - uid = mp.create(props, filelike) - self.Created(uid) - logging.debug("created %s" % uid) - - return uid + mp.create_async(props, filelike, can_move=transfer_ownership, + completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Created(self, uid): pass @@ -300,7 +308,7 @@ class DataStore(dbus.service.Object): order_by = [o.strip() for o in order_by.split(',')] if not isinstance(order_by, list): - logging.debug("bad query, order_by should be a list of property names") + logger.debug("bad query, order_by should be a list of property names") order_by = None # generate a sort function based on the complete set of @@ -406,22 +414,31 @@ class DataStore(dbus.service.Object): results = results.union(result) return results + def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None): + logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc)) + if exc is not None: + async_err_cb(exc) + return + + self.Updated(content.id) + logger.debug("updated %s" % content.id) + async_cb() #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}s', - out_signature='') - def update(self, uid, props, filelike=None): + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb')) + def update(self, uid, props, filelike=None, transfer_ownership=False, + async_cb=None, async_err_cb=None): """Record the current state of the object checked out for a given uid. If contents have been written to another file for example. You must create it """ content = self.get(uid) mountpoint = props.pop('mountpoint', None) - content.backingstore.update(uid, props, filelike) - - self.Updated(content.id) - logger.debug("updated %s" % content.id) + content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership, + completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Updated(self, uid): pass |