Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2007-09-08 01:42:17 (GMT)
committer Dan Williams <dcbw@redhat.com>2007-09-08 01:42:17 (GMT)
commit0e38f871622d85b0c4651d0fc64c4693b805eb79 (patch)
treee0d45fb335a7f95fac518da8719f1a59c88d4182
parentf46c8674cd85aef7dc38d7b70a3de7225a884d51 (diff)
Copy items asynchronously, support moves instead of copies
update() and create() now support a 'transfer_ownership' argument that causes the datastore to move the object, rather than copy it when it can. When copies must be done, the copy is internally asynchronous when possible to allow the datastore to continue processing requests from clients while the copy is ongoing.
-rw-r--r--NEWS4
-rw-r--r--src/olpc/datastore/backingstore.py204
-rw-r--r--src/olpc/datastore/bin_copy.py7
-rw-r--r--src/olpc/datastore/datastore.py49
4 files changed, 223 insertions, 41 deletions
diff --git a/NEWS b/NEWS
index d7dc243..da00958 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,7 @@
+* On update() and create(), copy asynchronously where possible, and
+ add new "transfer_ownership" argument to update() and create() that
+ allow the datastore to move the object rather than copy it (dcbw)
+
Snapshot a111996299
Snapshot 356b35a278
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 47e8214..886f34f 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -27,6 +27,11 @@ from olpc.datastore import utils
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
+import logging
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+logger = logging.getLogger(DS_LOG_CHANNEL)
+logger.setLevel(logging.DEBUG)
+
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
@@ -110,6 +115,64 @@ class BackingStore(object):
"""
pass
+import time
+class AsyncCopy:
+ CHUNK_SIZE=65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError("Error writing data to destination file"))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
+ self._cleanup()
+ self.completion(None, self.dest)
+ return False
+ except Exception, err:
+ logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
+
+ self.tstart = time.time()
+ import gobject
+ sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
""" A backing store that directs maps the storage of content
@@ -338,7 +401,15 @@ class FileBackingStore(BackingStore):
raise ValueError("Content for %s corrupt" % uid)
return content
- def _writeContent(self, uid, filelike, replace=True, target=None):
+ def _writeContent_complete(self, path, completion=None):
+ if completion is None:
+ return path
+ completion(None, path)
+ return None
+
+ def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
+ completion=None):
+ """Returns: path of file in datastore (new path if it was copied/moved)"""
content = None
if target: path = target
else:
@@ -350,7 +421,19 @@ class FileBackingStore(BackingStore):
if filelike.name != path:
# protection on inplace stores
- bin_copy.bin_copy(filelike.name, path)
+ if completion is None:
+ bin_copy.bin_copy(filelike.name, path)
+ return path
+
+ if can_move:
+ bin_copy.bin_mv(filelike.name, path)
+ return self._writeContent_complete(path, completion)
+
+ # Otherwise, async copy
+ aco = AsyncCopy(filelike.name, path, completion)
+ aco.start()
+ else:
+ return self._writeContent_complete(path, completion)
def _checksum(self, filename):
c = sha.sha()
@@ -361,15 +444,42 @@ class FileBackingStore(BackingStore):
return c.hexdigest()
# File Management API
- def create(self, props, filelike):
- uid = self.indexmanager.index(props, filelike)
+ def _create_completion(self, uid, props, completion, exc=None, path=None):
+ if exc:
+ completion(exc)
+ return
+ try:
+ # Index the content this time
+ uid = self.indexmanager.index(props, path)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create_async(self, props, filelike, can_move=False, completion=None):
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async create")
+ uid = self.indexmanager.index(props)
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ self._writeContent(uid, filelike, replace=False, can_move=can_move,
+ completion=lambda *args: self._create_completion(uid, props, completion, *args))
+ else:
+ completion(None, uid)
+
+ def create(self, props, filelike, can_move=False):
if filelike:
+ uid = self.indexmanager.index(props, None)
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self._writeContent(uid, filelike, replace=False)
- return uid
+ path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
+ return self.indexmanager.index(props, path)
+ else:
+ return self.indexmanager.index(props)
def get(self, uid, env=None, allowMissing=False, includeFile=False):
content = self.indexmanager.get(uid)
@@ -381,22 +491,49 @@ class FileBackingStore(BackingStore):
fp = open(path, 'r')
# now return a Content object from the model associated with
# this file object
- return self._mapContent(uid, fp, path, env)
+ content = self._mapContent(uid, fp, path, env)
+ if fp:
+ fp.close()
+ return content
- def update(self, uid, props, filelike=None):
+ def _update_completion(self, uid, props, completion, exc=None, path=None):
+ if exc is not None:
+ completion(exc)
+ return
+ try:
+ self.indexmanager.index(props, path)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update_async(self, uid, props, filelike, can_move=False, completion=None):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async update")
if 'uid' not in props: props['uid'] = uid
-
- self.indexmanager.index(props, filelike)
- filename = filelike
+
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self.set(uid, filelike)
+ self._writeContent(uid, filelike, can_move=can_move,
+ completion=lambda *args: self._update_completion(uid, props, completion, *args))
+ else:
+ completion()
- def set(self, uid, filelike):
- self._writeContent(uid, filelike)
+ def update(self, uid, props, filelike=None, can_move=False):
+ if 'uid' not in props: props['uid'] = uid
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ path = self._writeContent(uid, filelike, can_move=can_move)
+ self.indexmanager.index(props, path)
+ else:
+ self.indexmanager.index(props)
def delete(self, uid, allowMissing=True):
self.indexmanager.delete(uid)
@@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore):
## return open(targetpath, 'rw')
# File Management API
- def create(self, props, filelike):
+ def create_async(self, props, filelike, completion, can_move=False):
+ """Inplace backing store doesn't copy, so no need for async"""
+ if not filelike:
+ raise RuntimeError("Filelike must be valid for async create")
+ try:
+ uid = self.create(props, filelike, can_move)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create(self, props, filelike, can_move=False):
# the file would have already been changed inplace
# don't touch it
proposed_name = None
@@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- uid = self.indexmanager.index(props, filelike)
-
+ uid = self.indexmanager.index(props, None)
+ path = filelike
if proposed_name and not os.path.exists(proposed_name):
- self._writeContent(uid, filelike, replace=False, target=proposed_name)
-
- return uid
+ path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
+ return self.indexmanager.index(props, path)
def get(self, uid, env=None, allowMissing=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
return content
- def update(self, uid, props, filelike=None):
+ def update_async(self, uid, props, filelike, completion, can_move=False):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ try:
+ self.update(uid, props, filelike, can_move)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update(self, uid, props, filelike=None, can_move=False):
# the file would have already been changed inplace
# don't touch it
props['uid'] = uid
@@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- self.indexmanager.index(props, filelike)
-
+ path = filelike
if proposed_name:
- self._writeContent(uid, filelike, replace=True, target=proposed_name)
-
+ path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
+ self.indexmanager.index(props, path)
def delete(self, uid):
c = self.indexmanager.get(uid)
diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py
index 1be1b6b..df28052 100644
--- a/src/olpc/datastore/bin_copy.py
+++ b/src/olpc/datastore/bin_copy.py
@@ -9,6 +9,13 @@ def bin_copy(src, dest, mode=0600):
else:
os.chmod(dest, mode)
+def bin_mv(src, dest, mode=0600):
+ try:
+ subprocess.check_call(['/bin/mv', src, dest])
+ except subprocess.CalledProcessError:
+ raise OSError("Move failed %s %s" % (src, dest))
+ else:
+ os.chmod(dest, mode)
if __name__ == "__main__":
import sys
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index 19342e2..f37bff3 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -183,12 +183,23 @@ class DataStore(dbus.service.Object):
mp = self.root
return mp
+ def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None):
+ logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Created(uid)
+ logger.debug("created %s" % uid)
+ async_cb(uid)
+
# PUBLIC API
#@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}s',
- out_signature='s')
- def create(self, props, filelike=None):
+ in_signature='a{sv}sb',
+ out_signature='s',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None):
"""create a new entry in the datastore. If a file is passed it
will be consumed by the datastore. Because the repository has
a checkin/checkout model this will create a copy of the file
@@ -200,11 +211,8 @@ class DataStore(dbus.service.Object):
over this process can come at a later time.
"""
mp = self._resolveMountpoint(props)
- uid = mp.create(props, filelike)
- self.Created(uid)
- logging.debug("created %s" % uid)
-
- return uid
+ mp.create_async(props, filelike, can_move=transfer_ownership,
+ completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args))
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Created(self, uid): pass
@@ -300,7 +308,7 @@ class DataStore(dbus.service.Object):
order_by = [o.strip() for o in order_by.split(',')]
if not isinstance(order_by, list):
- logging.debug("bad query, order_by should be a list of property names")
+ logger.debug("bad query, order_by should be a list of property names")
order_by = None
# generate a sort function based on the complete set of
@@ -406,22 +414,31 @@ class DataStore(dbus.service.Object):
results = results.union(result)
return results
+ def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None):
+ logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Updated(content.id)
+ logger.debug("updated %s" % content.id)
+ async_cb()
#@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}s',
- out_signature='')
- def update(self, uid, props, filelike=None):
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def update(self, uid, props, filelike=None, transfer_ownership=False,
+ async_cb=None, async_err_cb=None):
"""Record the current state of the object checked out for a
given uid. If contents have been written to another file for
example. You must create it
"""
content = self.get(uid)
mountpoint = props.pop('mountpoint', None)
- content.backingstore.update(uid, props, filelike)
-
- self.Updated(content.id)
- logger.debug("updated %s" % content.id)
+ content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership,
+ completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args))
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Updated(self, uid): pass