Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/olpc/datastore/backingstore.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/olpc/datastore/backingstore.py')
-rw-r--r--src/olpc/datastore/backingstore.py204
1 files changed, 179 insertions, 25 deletions
diff --git a/src/olpc/datastore/backingstore.py b/src/olpc/datastore/backingstore.py
index 47e8214..886f34f 100644
--- a/src/olpc/datastore/backingstore.py
+++ b/src/olpc/datastore/backingstore.py
@@ -27,6 +27,11 @@ from olpc.datastore import utils
# changing this pattern impacts _targetFile
filename_attempt_pattern = re.compile('\(\d+\)$')
+import logging
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+logger = logging.getLogger(DS_LOG_CHANNEL)
+logger.setLevel(logging.DEBUG)
+
class BackingStore(object):
"""Backing stores manage stable storage. We abstract out the
management of file/blob storage through this class, as well as the
@@ -110,6 +115,64 @@ class BackingStore(object):
"""
pass
+import time
+class AsyncCopy:
+ CHUNK_SIZE=65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logger.debug("AC: Error writing %s -> %s: wrote less than expected" % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError("Error writing data to destination file"))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ logger.debug("AC: Copied %s -> %s (%d bytes, %ds)" % (self.src, self.dest, self.written, time.time() - self.tstart))
+ self._cleanup()
+ self.completion(None, self.dest)
+ return False
+ except Exception, err:
+ logger.debug("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ logger.debug("AC: will copy %s -> %s (%d bytes)" % (self.src, self.dest, self.size))
+
+ self.tstart = time.time()
+ import gobject
+ sid = gobject.idle_add(self._copy_block)
class FileBackingStore(BackingStore):
""" A backing store that directs maps the storage of content
@@ -338,7 +401,15 @@ class FileBackingStore(BackingStore):
raise ValueError("Content for %s corrupt" % uid)
return content
- def _writeContent(self, uid, filelike, replace=True, target=None):
+ def _writeContent_complete(self, path, completion=None):
+ if completion is None:
+ return path
+ completion(None, path)
+ return None
+
+ def _writeContent(self, uid, filelike, replace=True, can_move=False, target=None,
+ completion=None):
+ """Returns: path of file in datastore (new path if it was copied/moved)"""
content = None
if target: path = target
else:
@@ -350,7 +421,19 @@ class FileBackingStore(BackingStore):
if filelike.name != path:
# protection on inplace stores
- bin_copy.bin_copy(filelike.name, path)
+ if completion is None:
+ bin_copy.bin_copy(filelike.name, path)
+ return path
+
+ if can_move:
+ bin_copy.bin_mv(filelike.name, path)
+ return self._writeContent_complete(path, completion)
+
+ # Otherwise, async copy
+ aco = AsyncCopy(filelike.name, path, completion)
+ aco.start()
+ else:
+ return self._writeContent_complete(path, completion)
def _checksum(self, filename):
c = sha.sha()
@@ -361,15 +444,42 @@ class FileBackingStore(BackingStore):
return c.hexdigest()
# File Management API
- def create(self, props, filelike):
- uid = self.indexmanager.index(props, filelike)
+ def _create_completion(self, uid, props, completion, exc=None, path=None):
+ if exc:
+ completion(exc)
+ return
+ try:
+ # Index the content this time
+ uid = self.indexmanager.index(props, path)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create_async(self, props, filelike, can_move=False, completion=None):
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async create")
+ uid = self.indexmanager.index(props)
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ self._writeContent(uid, filelike, replace=False, can_move=can_move,
+ completion=lambda *args: self._create_completion(uid, props, completion, *args))
+ else:
+ completion(None, uid)
+
+ def create(self, props, filelike, can_move=False):
if filelike:
+ uid = self.indexmanager.index(props, None)
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self._writeContent(uid, filelike, replace=False)
- return uid
+ path = self._writeContent(uid, filelike, replace=False, can_move=can_move)
+ return self.indexmanager.index(props, path)
+ else:
+ return self.indexmanager.index(props)
def get(self, uid, env=None, allowMissing=False, includeFile=False):
content = self.indexmanager.get(uid)
@@ -381,22 +491,49 @@ class FileBackingStore(BackingStore):
fp = open(path, 'r')
# now return a Content object from the model associated with
# this file object
- return self._mapContent(uid, fp, path, env)
+ content = self._mapContent(uid, fp, path, env)
+ if fp:
+ fp.close()
+ return content
- def update(self, uid, props, filelike=None):
+ def _update_completion(self, uid, props, completion, exc=None, path=None):
+ if exc is not None:
+ completion(exc)
+ return
+ try:
+ self.indexmanager.index(props, path)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update_async(self, uid, props, filelike, can_move=False, completion=None):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ if completion is None:
+ raise RuntimeError("Completion must be valid for async update")
if 'uid' not in props: props['uid'] = uid
-
- self.indexmanager.index(props, filelike)
- filename = filelike
+
if filelike:
if isinstance(filelike, basestring):
# lets treat it as a filename
filelike = open(filelike, "r")
filelike.seek(0)
- self.set(uid, filelike)
+ self._writeContent(uid, filelike, can_move=can_move,
+ completion=lambda *args: self._update_completion(uid, props, completion, *args))
+ else:
+ completion()
- def set(self, uid, filelike):
- self._writeContent(uid, filelike)
+ def update(self, uid, props, filelike=None, can_move=False):
+ if 'uid' not in props: props['uid'] = uid
+ if filelike:
+ if isinstance(filelike, basestring):
+ # lets treat it as a filename
+ filelike = open(filelike, "r")
+ filelike.seek(0)
+ path = self._writeContent(uid, filelike, can_move=can_move)
+ self.indexmanager.index(props, path)
+ else:
+ self.indexmanager.index(props)
def delete(self, uid, allowMissing=True):
self.indexmanager.delete(uid)
@@ -527,7 +664,17 @@ class InplaceFileBackingStore(FileBackingStore):
## return open(targetpath, 'rw')
# File Management API
- def create(self, props, filelike):
+ def create_async(self, props, filelike, completion, can_move=False):
+ """Inplace backing store doesn't copy, so no need for async"""
+ if not filelike:
+ raise RuntimeError("Filelike must be valid for async create")
+ try:
+ uid = self.create(props, filelike, can_move)
+ completion(None, uid)
+ except Exception, exc:
+ completion(exc)
+
+ def create(self, props, filelike, can_move=False):
# the file would have already been changed inplace
# don't touch it
proposed_name = None
@@ -547,19 +694,27 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- uid = self.indexmanager.index(props, filelike)
-
+ uid = self.indexmanager.index(props, None)
+ path = filelike
if proposed_name and not os.path.exists(proposed_name):
- self._writeContent(uid, filelike, replace=False, target=proposed_name)
-
- return uid
+ path = self._writeContent(uid, filelike, replace=False, target=proposed_name)
+ return self.indexmanager.index(props, path)
def get(self, uid, env=None, allowMissing=False):
content = self.indexmanager.get(uid)
if not content: raise KeyError(uid)
return content
- def update(self, uid, props, filelike=None):
+ def update_async(self, uid, props, filelike, completion, can_move=False):
+ if filelike is None:
+ raise RuntimeError("Filelike must be valid for async update")
+ try:
+ self.update(uid, props, filelike, can_move)
+ completion()
+ except Exception, exc:
+ completion(exc)
+
+ def update(self, uid, props, filelike=None, can_move=False):
# the file would have already been changed inplace
# don't touch it
props['uid'] = uid
@@ -581,11 +736,10 @@ class InplaceFileBackingStore(FileBackingStore):
props['filename'] = proposed_name
proposed_name = os.path.join(self.uri, proposed_name)
- self.indexmanager.index(props, filelike)
-
+ path = filelike
if proposed_name:
- self._writeContent(uid, filelike, replace=True, target=proposed_name)
-
+ path = self._writeContent(uid, filelike, replace=True, target=proposed_name)
+ self.indexmanager.index(props, path)
def delete(self, uid):
c = self.indexmanager.get(uid)