Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/backingstore.py
diff options
context:
space:
mode:
authorDan Williams <dcbw@redhat.com>2007-09-08 01:42:17 (GMT)
committer Dan Williams <dcbw@redhat.com>2007-09-08 01:42:17 (GMT)
commit0e38f871622d85b0c4651d0fc64c4693b805eb79 (patch)
treee0d45fb335a7f95fac518da8719f1a59c88d4182 /src/olpc/datastore/backingstore.py
parentf46c8674cd85aef7dc38d7b70a3de7225a884d51 (diff)
Copy items asynchronously, support moves instead of copies
update() and create() now support a 'transfer_ownership' argument that causes the datastore to move the object, rather than copy it when it can. When copies must be done, the copy is internally asynchronous when possible to allow the datastore to continue processing requests from clients while the copy is ongoing.
Diffstat (limited to 'src/olpc/datastore/backingstore.py')
-rw-r--r--src/olpc/datastore/backingstore.py204
1 files changed, 179 insertions, 25 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 47e8214..886f34f 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -27,6 +27,11 @@ from olpc.datastore import utils
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
+import logging
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+logger = logging.getLogger(DS_LOG_CHANNEL)
+logger.setLevel(logging.DEBUG)
+
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
@@ -110,6 +115,64 @@ class BackingStore(object):
"""
pass
+import time
+class AsyncCopy:
+ CHUNK_SIZE=65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError("Error writing data to destination file"))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
+ self._cleanup()
+ self.completion(None, self.dest)
+ return False
+ except Exception, err:
+ logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
+
+ self.tstart = time.time()
+ import gobject
+ sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
""" A backing store that directs maps the storage of content
@@ -338,7 +401,15 @@ class FileBackingStore(BackingStore):
raise ValueError("Content for %s corrupt" % uid)
return content
- def _writeContent(self, uid, filelike, replace=True, target=None):
+ def _writeContent_complete(self, path, completion=None):
+ if completion is None:
+ return path
+ completion(None, path)
+ return None
+
+ def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
+ completion=None):
+ """Returns: path of file in datastore (new path if it was copied/moved)"""
content = None
if target: path = target
else:
@@ -350,7 +421,19 @@ class FileBackingStore(BackingStore):
if filelike.name != path:
# protection on inplace stores
- bin_copy.bin_copy(filelike.name, path)
+ if completion is None:
+ bin_copy.bin_copy(filelike.name, path)
+ return path
+
+ if can_move:
+ bin_copy.bin_mv(filelike.name, path)
+ return self._writeContent_complete(path, completion)
+
+ # Otherwise, async copy
+ aco = AsyncCopy(filelike.name, path, completion)
+ aco.start()
+ else:
+ return self._writeContent_complete(path, completion)
def _checksum(self, filename):
c = sha.sha()
@@ -361,15 +444,42 @@ class FileBackingStore(BackingStore):
return c.hexdigest()
# File Management API
- def create(self, props, filelike):
- uid = self.indexmanager.index(props, filelike)
+ def _create_completion(self, uid, props, completion, exc=None, path=None):
+ if exc:
+ completion(exc)
+ return
+ try:
+ # Index the content this time
+ uid = self.indexmanager.index(props, path)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create_async(self, props, filelike, can_move=False, completion=None):
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async create")
+ uid = self.indexmanager.index(props)
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ self._writeContent(uid, filelike, replace=False, can_move=can_move,
+ completion=lambda *args: self._create_completion(uid, props, completion, *args))
+ else:
+ completion(None, uid)
+
+ def create(self, props, filelike, can_move=False):
if filelike:
+ uid = self.indexmanager.index(props, None)
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self._writeContent(uid, filelike, replace=False)
- return uid
+ path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
+ return self.indexmanager.index(props, path)
+ else:
+ return self.indexmanager.index(props)
def get(self, uid, env=None, allowMissing=False, includeFile=False):
content = self.indexmanager.get(uid)
@@ -381,22 +491,49 @@ class FileBackingStore(BackingStore):
fp = open(path, 'r')
# now return a Content object from the model associated with
# this file object
- return self._mapContent(uid, fp, path, env)
+ content = self._mapContent(uid, fp, path, env)
+ if fp:
+ fp.close()
+ return content
- def update(self, uid, props, filelike=None):
+ def _update_completion(self, uid, props, completion, exc=None, path=None):
+ if exc is not None:
+ completion(exc)
+ return
+ try:
+ self.indexmanager.index(props, path)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update_async(self, uid, props, filelike, can_move=False, completion=None):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async update")
if 'uid' not in props: props['uid'] = uid
-
- self.indexmanager.index(props, filelike)
- filename = filelike
+
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self.set(uid, filelike)
+ self._writeContent(uid, filelike, can_move=can_move,
+ completion=lambda *args: self._update_completion(uid, props, completion, *args))
+ else:
+ completion()
- def set(self, uid, filelike):
- self._writeContent(uid, filelike)
+ def update(self, uid, props, filelike=None, can_move=False):
+ if 'uid' not in props: props['uid'] = uid
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ path = self._writeContent(uid, filelike, can_move=can_move)
+ self.indexmanager.index(props, path)
+ else:
+ self.indexmanager.index(props)
def delete(self, uid, allowMissing=True):
self.indexmanager.delete(uid)
@@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore):
## return open(targetpath, 'rw')
# File Management API
- def create(self, props, filelike):
+ def create_async(self, props, filelike, completion, can_move=False):
+ """Inplace backing store doesn't copy, so no need for async"""
+ if not filelike:
+ raise RuntimeError("Filelike must be valid for async create")
+ try:
+ uid = self.create(props, filelike, can_move)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create(self, props, filelike, can_move=False):
# the file would have already been changed inplace
# don't touch it
proposed_name = None
@@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- uid = self.indexmanager.index(props, filelike)
-
+ uid = self.indexmanager.index(props, None)
+ path = filelike
if proposed_name and not os.path.exists(proposed_name):
- self._writeContent(uid, filelike, replace=False, target=proposed_name)
-
- return uid
+ path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
+ return self.indexmanager.index(props, path)
def get(self, uid, env=None, allowMissing=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
return content
- def update(self, uid, props, filelike=None):
+ def update_async(self, uid, props, filelike, completion, can_move=False):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ try:
+ self.update(uid, props, filelike, can_move)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update(self, uid, props, filelike=None, can_move=False):
# the file would have already been changed inplace
# don't touch it
props['uid'] = uid
@@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- self.indexmanager.index(props, filelike)
-
+ path = filelike
if proposed_name:
- self._writeContent(uid, filelike, replace=True, target=proposed_name)
-
+ path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
+ self.indexmanager.index(props, path)
def delete(self, uid):
c = self.indexmanager.get(uid)