From 0e38f871622d85b0c4651d0fc64c4693b805eb79 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sat, 08 Sep 2007 01:42:17 +0000 Subject: Copy items asynchronously, support moves instead of copies update() and create() now support a 'transfer_ownership' argument that causes the datastore to move the object, rather than copy it when it can. When copies must be done, the copy is internally asynchronous when possible to allow the datastore to continue processing requests from clients while the copy is ongoing. --- diff --git a/NEWS b/NEWS index d7dc243..da00958 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,7 @@ +* On update() and create(), copy asynchronously where possible, and + add new "transfer_ownership" argument to update() and create() that + allow the datastore to move the object rather than copy it (dcbw) + Snapshot a111996299 Snapshot 356b35a278 diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py index 47e8214..886f34f 100644 --- a/src/olpc/datastore/backingstore.py +++ b/src/olpc/datastore/backingstore.py @@ -27,6 +27,11 @@ from olpc.datastore import utils # changing this pattern impacts _targetFile filename_attempt_pattern = re.compile('\(\d+\)$') +import logging +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' +logger = logging.getLogger(DS_LOG_CHANNEL) +logger.setLevel(logging.DEBUG) + class BackingStore(object): """Backing stores manage stable storage. We abstract out the management of file/blob storage through this class, as well as the @@ -110,6 +115,64 @@ class BackingStore(object): """ pass +import time +class AsyncCopy: + CHUNK_SIZE=65536 + + def __init__(self, src, dest, completion): + self.src = src + self.dest = dest + self.completion = completion + self.src_fp = -1 + self.dest_fp = -1 + self.written = 0 + self.size = 0 + + def _cleanup(self): + os.close(self.src_fp) + os.close(self.dest_fp) + + def _copy_block(self, user_data=None): + try: + data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) + count = os.write(self.dest_fp, data) + self.written += len(data) + + # error writing data to file? + if count < len(data): + logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest)) + self._cleanup() + self.completion(RuntimeError("Error writing data to destination file")) + return False + + # FIXME: emit progress here + + # done? + if len(data) < AsyncCopy.CHUNK_SIZE: + logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart)) + self._cleanup() + self.completion(None, self.dest) + return False + except Exception, err: + logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err)) + self._cleanup() + self.completion(err) + return False + + return True + + def start(self): + self.src_fp = os.open(self.src, os.O_RDONLY) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644) + + stat = os.fstat(self.src_fp) + self.size = stat[6] + + logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size)) + + self.tstart = time.time() + import gobject + sid = gobject.idle_add(self._copy_block) class FileBackingStore(BackingStore): """ A backing store that directs maps the storage of content @@ -338,7 +401,15 @@ class FileBackingStore(BackingStore): raise ValueError("Content for %s corrupt" % uid) return content - def _writeContent(self, uid, filelike, replace=True, target=None): + def _writeContent_complete(self, path, completion=None): + if completion is None: + return path + completion(None, path) + return None + + def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None, + completion=None): + """Returns: path of file in datastore (new path if it was copied/moved)""" content = None if target: path = target else: @@ -350,7 +421,19 @@ class FileBackingStore(BackingStore): if filelike.name != path: # protection on inplace stores - bin_copy.bin_copy(filelike.name, path) + if completion is None: + bin_copy.bin_copy(filelike.name, path) + return path + + if can_move: + bin_copy.bin_mv(filelike.name, path) + return self._writeContent_complete(path, completion) + + # Otherwise, async copy + aco = AsyncCopy(filelike.name, path, completion) + aco.start() + else: + return self._writeContent_complete(path, completion) def _checksum(self, filename): c = sha.sha() @@ -361,15 +444,42 @@ class FileBackingStore(BackingStore): return c.hexdigest() # File Management API - def create(self, props, filelike): - uid = self.indexmanager.index(props, filelike) + def _create_completion(self, uid, props, completion, exc=None, path=None): + if exc: + completion(exc) + return + try: + # Index the content this time + uid = self.indexmanager.index(props, path) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create_async(self, props, filelike, can_move=False, completion=None): + if completion is None: + raise RuntimeError("Completion must be valid for async create") + uid = self.indexmanager.index(props) + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + self._writeContent(uid, filelike, replace=False, can_move=can_move, + completion=lambda *args: self._create_completion(uid, props, completion, *args)) + else: + completion(None, uid) + + def create(self, props, filelike, can_move=False): if filelike: + uid = self.indexmanager.index(props, None) if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self._writeContent(uid, filelike, replace=False) - return uid + path = self._writeContent(uid, filelike, replace=False, can_move=can_move) + return self.indexmanager.index(props, path) + else: + return self.indexmanager.index(props) def get(self, uid, env=None, allowMissing=False, includeFile=False): content = self.indexmanager.get(uid) @@ -381,22 +491,49 @@ class FileBackingStore(BackingStore): fp = open(path, 'r') # now return a Content object from the model associated with # this file object - return self._mapContent(uid, fp, path, env) + content = self._mapContent(uid, fp, path, env) + if fp: + fp.close() + return content - def update(self, uid, props, filelike=None): + def _update_completion(self, uid, props, completion, exc=None, path=None): + if exc is not None: + completion(exc) + return + try: + self.indexmanager.index(props, path) + completion() + except Exception, exc: + completion(exc) + + def update_async(self, uid, props, filelike, can_move=False, completion=None): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + if completion is None: + raise RuntimeError("Completion must be valid for async update") if 'uid' not in props: props['uid'] = uid - - self.indexmanager.index(props, filelike) - filename = filelike + if filelike: if isinstance(filelike, basestring): # lets treat it as a filename filelike = open(filelike, "r") filelike.seek(0) - self.set(uid, filelike) + self._writeContent(uid, filelike, can_move=can_move, + completion=lambda *args: self._update_completion(uid, props, completion, *args)) + else: + completion() - def set(self, uid, filelike): - self._writeContent(uid, filelike) + def update(self, uid, props, filelike=None, can_move=False): + if 'uid' not in props: props['uid'] = uid + if filelike: + if isinstance(filelike, basestring): + # lets treat it as a filename + filelike = open(filelike, "r") + filelike.seek(0) + path = self._writeContent(uid, filelike, can_move=can_move) + self.indexmanager.index(props, path) + else: + self.indexmanager.index(props) def delete(self, uid, allowMissing=True): self.indexmanager.delete(uid) @@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore): ## return open(targetpath, 'rw') # File Management API - def create(self, props, filelike): + def create_async(self, props, filelike, completion, can_move=False): + """Inplace backing store doesn't copy, so no need for async""" + if not filelike: + raise RuntimeError("Filelike must be valid for async create") + try: + uid = self.create(props, filelike, can_move) + completion(None, uid) + except Exception, exc: + completion(exc) + + def create(self, props, filelike, can_move=False): # the file would have already been changed inplace # don't touch it proposed_name = None @@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - uid = self.indexmanager.index(props, filelike) - + uid = self.indexmanager.index(props, None) + path = filelike if proposed_name and not os.path.exists(proposed_name): - self._writeContent(uid, filelike, replace=False, target=proposed_name) - - return uid + path = self._writeContent(uid, filelike, replace=False, target=proposed_name) + return self.indexmanager.index(props, path) def get(self, uid, env=None, allowMissing=False): content = self.indexmanager.get(uid) if not content: raise KeyError(uid) return content - def update(self, uid, props, filelike=None): + def update_async(self, uid, props, filelike, completion, can_move=False): + if filelike is None: + raise RuntimeError("Filelike must be valid for async update") + try: + self.update(uid, props, filelike, can_move) + completion() + except Exception, exc: + completion(exc) + + def update(self, uid, props, filelike=None, can_move=False): # the file would have already been changed inplace # don't touch it props['uid'] = uid @@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore): props['filename'] = proposed_name proposed_name = os.path.join(self.uri, proposed_name) - self.indexmanager.index(props, filelike) - + path = filelike if proposed_name: - self._writeContent(uid, filelike, replace=True, target=proposed_name) - + path = self._writeContent(uid, filelike, replace=True, target=proposed_name) + self.indexmanager.index(props, path) def delete(self, uid): c = self.indexmanager.get(uid) diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py index 1be1b6b..df28052 100644 --- a/src/olpc/datastore/bin_copy.py +++ b/src/olpc/datastore/bin_copy.py @@ -9,6 +9,13 @@ def bin_copy(src, dest, mode=0600): else: os.chmod(dest, mode) +def bin_mv(src, dest, mode=0600): + try: + subprocess.check_call(['/bin/mv', src, dest]) + except subprocess.CalledProcessError: + raise OSError("Move failed %s %s" % (src, dest)) + else: + os.chmod(dest, mode) if __name__ == "__main__": import sys diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 19342e2..f37bff3 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -183,12 +183,23 @@ class DataStore(dbus.service.Object): mp = self.root return mp + def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None): + logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid)) + if exc is not None: + async_err_cb(exc) + return + + self.Created(uid) + logger.debug("created %s" % uid) + async_cb(uid) + # PUBLIC API #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}s', - out_signature='s') - def create(self, props, filelike=None): + in_signature='a{sv}sb', + out_signature='s', + async_callbacks=('async_cb', 'async_err_cb')) + def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None): """create a new entry in the datastore. If a file is passed it will be consumed by the datastore. Because the repository has a checkin/checkout model this will create a copy of the file @@ -200,11 +211,8 @@ class DataStore(dbus.service.Object): over this process can come at a later time. """ mp = self._resolveMountpoint(props) - uid = mp.create(props, filelike) - self.Created(uid) - logging.debug("created %s" % uid) - - return uid + mp.create_async(props, filelike, can_move=transfer_ownership, + completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Created(self, uid): pass @@ -300,7 +308,7 @@ class DataStore(dbus.service.Object): order_by = [o.strip() for o in order_by.split(',')] if not isinstance(order_by, list): - logging.debug("bad query, order_by should be a list of property names") + logger.debug("bad query, order_by should be a list of property names") order_by = None # generate a sort function based on the complete set of @@ -406,22 +414,31 @@ class DataStore(dbus.service.Object): results = results.union(result) return results + def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None): + logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc)) + if exc is not None: + async_err_cb(exc) + return + + self.Updated(content.id) + logger.debug("updated %s" % content.id) + async_cb() #@utils.sanitize_dbus @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}s', - out_signature='') - def update(self, uid, props, filelike=None): + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb')) + def update(self, uid, props, filelike=None, transfer_ownership=False, + async_cb=None, async_err_cb=None): """Record the current state of the object checked out for a given uid. If contents have been written to another file for example. You must create it """ content = self.get(uid) mountpoint = props.pop('mountpoint', None) - content.backingstore.update(uid, props, filelike) - - self.Updated(content.id) - logger.debug("updated %s" % content.id) + content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership, + completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Updated(self, uid): pass -- cgit v0.9.1