Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--NEWS4
-rw-r--r--src/olpc/datastore/backingstore.py204
-rw-r--r--src/olpc/datastore/bin_copy.py7
-rw-r--r--src/olpc/datastore/datastore.py49
4 files changed, 223 insertions, 41 deletions
diff --git a/NEWS b/NEWS
index d7dc243..da00958 100644
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,7 @@
+* On update() and create(), copy asynchronously where possible, and
+ add new "transfer_ownership" argument to update() and create() that
+ allow the datastore to move the object rather than copy it (dcbw)
+
Snapshot a111996299
Snapshot 356b35a278
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 47e8214..886f34f 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -27,6 +27,11 @@ from olpc.datastore import utils
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
+import logging
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+logger = logging.getLogger(DS_LOG_CHANNEL)
+logger.setLevel(logging.DEBUG)
+
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
@@ -110,6 +115,64 @@ class BackingStore(object):
"""
pass
+import time
+class AsyncCopy:
+ CHUNK_SIZE=65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError("Error writing data to destination file"))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
+ self._cleanup()
+ self.completion(None, self.dest)
+ return False
+ except Exception, err:
+ logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
+
+ self.tstart = time.time()
+ import gobject
+ sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
""" A backing store that directs maps the storage of content
@@ -338,7 +401,15 @@ class FileBackingStore(BackingStore):
raise ValueError("Content for %s corrupt" % uid)
return content
- def _writeContent(self, uid, filelike, replace=True, target=None):
+ def _writeContent_complete(self, path, completion=None):
+ if completion is None:
+ return path
+ completion(None, path)
+ return None
+
+ def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
+ completion=None):
+ """Returns: path of file in datastore (new path if it was copied/moved)"""
content = None
if target: path = target
else:
@@ -350,7 +421,19 @@ class FileBackingStore(BackingStore):
if filelike.name != path:
# protection on inplace stores
- bin_copy.bin_copy(filelike.name, path)
+ if completion is None:
+ bin_copy.bin_copy(filelike.name, path)
+ return path
+
+ if can_move:
+ bin_copy.bin_mv(filelike.name, path)
+ return self._writeContent_complete(path, completion)
+
+ # Otherwise, async copy
+ aco = AsyncCopy(filelike.name, path, completion)
+ aco.start()
+ else:
+ return self._writeContent_complete(path, completion)
def _checksum(self, filename):
c = sha.sha()
@@ -361,15 +444,42 @@ class FileBackingStore(BackingStore):
return c.hexdigest()
# File Management API
- def create(self, props, filelike):
- uid = self.indexmanager.index(props, filelike)
+ def _create_completion(self, uid, props, completion, exc=None, path=None):
+ if exc:
+ completion(exc)
+ return
+ try:
+ # Index the content this time
+ uid = self.indexmanager.index(props, path)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create_async(self, props, filelike, can_move=False, completion=None):
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async create")
+ uid = self.indexmanager.index(props)
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ self._writeContent(uid, filelike, replace=False, can_move=can_move,
+ completion=lambda *args: self._create_completion(uid, props, completion, *args))
+ else:
+ completion(None, uid)
+
+ def create(self, props, filelike, can_move=False):
if filelike:
+ uid = self.indexmanager.index(props, None)
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self._writeContent(uid, filelike, replace=False)
- return uid
+ path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
+ return self.indexmanager.index(props, path)
+ else:
+ return self.indexmanager.index(props)
def get(self, uid, env=None, allowMissing=False, includeFile=False):
content = self.indexmanager.get(uid)
@@ -381,22 +491,49 @@ class FileBackingStore(BackingStore):
fp = open(path, 'r')
# now return a Content object from the model associated with
# this file object
- return self._mapContent(uid, fp, path, env)
+ content = self._mapContent(uid, fp, path, env)
+ if fp:
+ fp.close()
+ return content
- def update(self, uid, props, filelike=None):
+ def _update_completion(self, uid, props, completion, exc=None, path=None):
+ if exc is not None:
+ completion(exc)
+ return
+ try:
+ self.indexmanager.index(props, path)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update_async(self, uid, props, filelike, can_move=False, completion=None):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async update")
if 'uid' not in props: props['uid'] = uid
-
- self.indexmanager.index(props, filelike)
- filename = filelike
+
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self.set(uid, filelike)
+ self._writeContent(uid, filelike, can_move=can_move,
+ completion=lambda *args: self._update_completion(uid, props, completion, *args))
+ else:
+ completion()
- def set(self, uid, filelike):
- self._writeContent(uid, filelike)
+ def update(self, uid, props, filelike=None, can_move=False):
+ if 'uid' not in props: props['uid'] = uid
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ path = self._writeContent(uid, filelike, can_move=can_move)
+ self.indexmanager.index(props, path)
+ else:
+ self.indexmanager.index(props)
def delete(self, uid, allowMissing=True):
self.indexmanager.delete(uid)
@@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore):
## return open(targetpath, 'rw')
# File Management API
- def create(self, props, filelike):
+ def create_async(self, props, filelike, completion, can_move=False):
+ """Inplace backing store doesn't copy, so no need for async"""
+ if not filelike:
+ raise RuntimeError("Filelike must be valid for async create")
+ try:
+ uid = self.create(props, filelike, can_move)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create(self, props, filelike, can_move=False):
# the file would have already been changed inplace
# don't touch it
proposed_name = None
@@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- uid = self.indexmanager.index(props, filelike)
-
+ uid = self.indexmanager.index(props, None)
+ path = filelike
if proposed_name and not os.path.exists(proposed_name):
- self._writeContent(uid, filelike, replace=False, target=proposed_name)
-
- return uid
+ path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
+ return self.indexmanager.index(props, path)
def get(self, uid, env=None, allowMissing=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
return content
- def update(self, uid, props, filelike=None):
+ def update_async(self, uid, props, filelike, completion, can_move=False):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ try:
+ self.update(uid, props, filelike, can_move)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update(self, uid, props, filelike=None, can_move=False):
# the file would have already been changed inplace
# don't touch it
props['uid'] = uid
@@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- self.indexmanager.index(props, filelike)
-
+ path = filelike
if proposed_name:
- self._writeContent(uid, filelike, replace=True, target=proposed_name)
-
+ path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
+ self.indexmanager.index(props, path)
def delete(self, uid):
c = self.indexmanager.get(uid)
diff --git a/src/olpc/datastore/bin_copy.py b/src/olpc/datastore/bin_copy.py
index 1be1b6b..df28052 100644
--- a/src/olpc/datastore/bin_copy.py
+++ b/src/olpc/datastore/bin_copy.py
@@ -9,6 +9,13 @@ def bin_copy(src, dest, mode=0600):
else:
os.chmod(dest, mode)
+def bin_mv(src, dest, mode=0600):
+ try:
+ subprocess.check_call(['/bin/mv', src, dest])
+ except subprocess.CalledProcessError:
+ raise OSError("Move failed %s %s" % (src, dest))
+ else:
+ os.chmod(dest, mode)
if __name__ == "__main__":
import sys
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index 19342e2..f37bff3 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -183,12 +183,23 @@ class DataStore(dbus.service.Object):
mp = self.root
return mp
+ def _create_completion(self, async_cb, async_err_cb, exc=None, uid=None):
+ logger.debug("_create_completion_cb() called with %r / %r, exc %r, uid %r" % (async_cb, async_err_cb, exc, uid))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Created(uid)
+ logger.debug("created %s" % uid)
+ async_cb(uid)
+
# PUBLIC API
#@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}s',
- out_signature='s')
- def create(self, props, filelike=None):
+ in_signature='a{sv}sb',
+ out_signature='s',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def create(self, props, filelike=None, transfer_ownership=False, async_cb=None, async_err_cb=None):
"""create a new entry in the datastore. If a file is passed it
will be consumed by the datastore. Because the repository has
a checkin/checkout model this will create a copy of the file
@@ -200,11 +211,8 @@ class DataStore(dbus.service.Object):
over this process can come at a later time.
"""
mp = self._resolveMountpoint(props)
- uid = mp.create(props, filelike)
- self.Created(uid)
- logging.debug("created %s" % uid)
-
- return uid
+ mp.create_async(props, filelike, can_move=transfer_ownership,
+ completion=lambda *args: self._create_completion(async_cb, async_err_cb, *args))
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Created(self, uid): pass
@@ -300,7 +308,7 @@ class DataStore(dbus.service.Object):
order_by = [o.strip() for o in order_by.split(',')]
if not isinstance(order_by, list):
- logging.debug("bad query, order_by should be a list of property names")
+ logger.debug("bad query, order_by should be a list of property names")
order_by = None
# generate a sort function based on the complete set of
@@ -406,22 +414,31 @@ class DataStore(dbus.service.Object):
results = results.union(result)
return results
+ def _update_completion_cb(self, async_cb, async_err_cb, content, exc=None):
+ logger.debug("_update_completion_cb() called with %r / %r, exc %r" % (async_cb, async_err_cb, exc))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Updated(content.id)
+ logger.debug("updated %s" % content.id)
+ async_cb()
#@utils.sanitize_dbus
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}s',
- out_signature='')
- def update(self, uid, props, filelike=None):
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def update(self, uid, props, filelike=None, transfer_ownership=False,
+ async_cb=None, async_err_cb=None):
"""Record the current state of the object checked out for a
given uid. If contents have been written to another file for
example. You must create it
"""
content = self.get(uid)
mountpoint = props.pop('mountpoint', None)
- content.backingstore.update(uid, props, filelike)
-
- self.Updated(content.id)
- logger.debug("updated %s" % content.id)
+ content.backingstore.update_async(uid, props, filelike, can_move=transfer_ownership,
+ completion=lambda *args: self._update_completion_cb(async_cb, async_err_cb, content, *args))
@dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
def Updated(self, uid): pass