Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSascha Silbe <sascha-pgp@silbe.org>2010-06-15 21:37:55 (GMT)
committer Sascha Silbe <sascha-pgp@silbe.org>2010-06-15 21:37:55 (GMT)
commit21208b14bd1784313934adf0f153e133fe1ed81c (patch)
tree7c935414a8e6ceea350f69fc98a5be3fb8c61c3e
parent5ba8a59a93863da720172895ef98069b61d49604 (diff)
feature implemented but not tested, regular test suite works fine
-rw-r--r--src/carquinyol/datastore.py136
-rw-r--r--src/carquinyol/filestore.py19
2 files changed, 136 insertions, 19 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index a556869..f7befbe 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -19,8 +19,10 @@ import logging
import uuid
import time
import os
+import sys
import traceback
+from decorator import decorator
import dbus
import dbus.service
import gobject
@@ -45,11 +47,25 @@ DS_OBJECT_PATH = "/org/laptop/sugar/DataStore"
logger = logging.getLogger(DS_LOG_CHANNEL)
+@decorator
+def queue_if_frozen(method, self, *args, **kwargs):
+ """Decorator to add method invocations to self.queue if self.frozen is set.
+
+ Should only be used to wrap methods that have no return value.
+ """
+ if self.frozen:
+ self.queue.append(lambda: method(self, *args, **kwargs))
+ else:
+ return method(self, *args, **kwargs)
+
+
class DataStore(dbus.service.Object):
"""D-Bus API and logic for connecting all the other components.
"""
def __init__(self, **options):
+ self.frozen = False
+ self.queue = []
bus_name = dbus.service.BusName(DS_SERVICE,
bus=dbus.SessionBus(),
replace_existing=False,
@@ -149,6 +165,7 @@ class DataStore(dbus.service.Object):
out_signature='s',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
+ @queue_if_frozen
def create(self, props, file_path, transfer_ownership,
async_cb, async_err_cb):
uid = str(uuid.uuid4())
@@ -186,6 +203,7 @@ class DataStore(dbus.service.Object):
out_signature='',
async_callbacks=('async_cb', 'async_err_cb'),
byte_arrays=True)
+ @queue_if_frozen
def update(self, uid, props, file_path, transfer_ownership,
async_cb, async_err_cb):
logging.debug('datastore.update %r', uid)
@@ -210,9 +228,14 @@ class DataStore(dbus.service.Object):
pass
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}as',
- out_signature='aa{sv}u')
- def find(self, query, properties):
+ in_signature='a{sv}as', out_signature='aa{sv}u',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ @queue_if_frozen
+ def find(self, query, properties, async_cb, async_err_cb):
+ self._sync_to_dbus_async(self._find, query, properties,
+ async_cb=async_cb, async_err_cb=async_err_cb)
+
+ def _find(self, query, properties):
logging.debug('datastore.find %r', query)
t = time.time()
@@ -259,10 +282,15 @@ class DataStore(dbus.service.Object):
return entries, count
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='s',
- sender_keyword='sender')
- def get_filename(self, uid, sender=None):
+ in_signature='s', out_signature='s',
+ sender_keyword='sender',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ @queue_if_frozen
+ def get_filename(self, uid, async_cb, async_err_cb, sender=None):
+ self._sync_to_dbus_async(self._get_filename, uid, sender,
+ async_cb=async_cb, async_err_cb=async_err_cb)
+
+ def _get_filename(self, uid, sender=None):
logging.debug('datastore.get_filename %r', uid)
user_id = dbus.Bus().get_unix_user(sender)
extension = self._get_extension(uid)
@@ -275,17 +303,27 @@ class DataStore(dbus.service.Object):
return mime.get_primary_extension(mime_type)
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='a{sv}')
- def get_properties(self, uid):
+ in_signature='s', out_signature='a{sv}',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ @queue_if_frozen
+ def get_properties(self, uid, async_cb, async_err_cb):
+ self._sync_to_dbus_async(self._get_properties, uid,
+ async_cb=async_cb, async_err_cb=async_err_cb)
+
+ def _get_properties(self, uid):
logging.debug('datastore.get_properties %r', uid)
metadata = self._metadata_store.retrieve(uid)
return metadata
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}',
- out_signature='as')
- def get_uniquevaluesfor(self, propertyname, query=None):
+ in_signature='sa{sv}', out_signature='as',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ @queue_if_frozen
+ def get_uniquevaluesfor(self, propertyname, query, async_cb, async_err_cb):
+ self._sync_to_dbus_async(self._get_uniquevaluesfor, propertyname,
+ query, async_cb=async_cb, async_err_cb=async_err_cb)
+
+ def _get_uniquevaluesfor(self, propertyname, query):
if propertyname != 'activity':
raise ValueError('Only ''activity'' is a supported property name')
if query:
@@ -297,9 +335,14 @@ class DataStore(dbus.service.Object):
return []
@dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='')
- def delete(self, uid):
+ in_signature='s', out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ @queue_if_frozen
+ def delete(self, uid, async_cb, async_err_cb):
+ self._sync_to_dbus_async(self._delete, uid,
+ async_cb=async_cb, async_err_cb=async_err_cb)
+
+ def _delete(self, uid):
self._optimizer.remove(uid)
self._index_store.delete(uid)
@@ -350,3 +393,64 @@ class DataStore(dbus.service.Object):
@dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
def Unmounted(self, descriptor):
pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='', out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'))
+ def freeze(self, async_cb, async_err_cb):
+ """Close all open files and stop processing requests until thaw.
+
+ Intended for doing online backups.
+ """
+ self.frozen = True
+ self._index_store.close_index()
+ gobject.idle_add(self._freeze_wait_cb, async_cb, async_err_cb,
+ priority=gobject.PRIORITY_LOW)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='', out_signature='')
+ def thaw(self):
+ """Resume normal operation after freeze.
+ """
+ self._index_store.open_index()
+ self.frozen = False
+ gobject.idle_add(self._thaw_work_cb, priority=gobject.PRIORITY_DEFAULT)
+
+ def _freeze_wait_cb(self, async_cb, async_err_cb):
+ """Stall the freeze() method while asynchronous operations are in
+ progress.
+ """
+ if self._file_store.async_running():
+ return True
+
+ async_cb()
+ return False
+
+ def _thaw_work_cb(self, async_cb, async_err_cb):
+ """Process requests that were queued during freeze."""
+ if not self.queue:
+ return False
+
+ self.queue.pop(0)()
+ return True
+
+ def _sync_to_dbus_async(self, method, *args, **kwargs):
+ """Wrap a synchronous method in an asynchronous DBus method.
+
+ The DBus callbacks must be passed as keyword arguments async_cb and
+ async_err_cb.
+ """
+ async_cb = kwargs.pop('async_cb')
+ async_err_cb = kwargs.pop('async_err_cb')
+ try:
+ result = method(*args, **kwargs)
+ except Exception, exception:
+ async_err_cb(exception)
+ return
+
+ if isinstance(result, tuple):
+ async_cb(*result)
+ elif result is None:
+ async_cb()
+ else:
+ async_cb(result)
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index 9724397..c8607c0 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -28,6 +28,9 @@ class FileStore(object):
"""Handle the storage of one file per entry.
"""
+ def __init__(self):
+ self._async_ops = []
+
# TODO: add protection against store and retrieve operations on entries
# that are being processed async.
@@ -76,8 +79,13 @@ class FileStore(object):
"""
logging.debug('FileStore copying from %r to %r', file_path,
destination_path)
- async_copy = AsyncCopy(file_path, destination_path, completion_cb,
- unlink_src)
+ def async_finished_cb(async_copy, error):
+ self._async_ops.remove(async_copy)
+ completion_cb(error)
+
+ async_copy = AsyncCopy(file_path, destination_path, async_finished_cb,
+ unlink_src)
+ self._async_ops.append(async_copy)
async_copy.start()
def retrieve(self, uid, user_id, extension):
@@ -156,6 +164,11 @@ class FileStore(object):
logging.debug('hard linking %r -> %r', new_file, existing_file)
os.link(existing_file, new_file)
+ def async_running(self):
+ """Check whether there are any asynchronous copies in progress.
+ """
+ return bool(self._async_ops)
+
class AsyncCopy(object):
"""Copy a file in chunks in the idle loop.
@@ -209,7 +222,7 @@ class AsyncCopy(object):
self._cleanup()
if self._unlink_src:
os.unlink(self.src)
- self.completion(*args)
+ self.completion(self, *args)
def start(self):
self.src_fp = os.open(self.src, os.O_RDONLY)