diff options
author | Sascha Silbe <sascha-pgp@silbe.org> | 2010-06-15 21:37:55 (GMT) |
---|---|---|
committer | Sascha Silbe <sascha-pgp@silbe.org> | 2010-06-15 21:37:55 (GMT) |
commit | 21208b14bd1784313934adf0f153e133fe1ed81c (patch) | |
tree | 7c935414a8e6ceea350f69fc98a5be3fb8c61c3e | |
parent | 5ba8a59a93863da720172895ef98069b61d49604 (diff) |
feature implemented but not tested, regular test suite works fine
-rw-r--r-- | src/carquinyol/datastore.py | 136 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 19 |
2 files changed, 136 insertions, 19 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index a556869..f7befbe 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -19,8 +19,10 @@ import logging import uuid import time import os +import sys import traceback +from decorator import decorator import dbus import dbus.service import gobject @@ -45,11 +47,25 @@ DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" logger = logging.getLogger(DS_LOG_CHANNEL) +@decorator +def queue_if_frozen(method, self, *args, **kwargs): + """Decorator to add method invocations to self.queue if self.frozen is set. + + Should only be used to wrap methods that have no return value. + """ + if self.frozen: + self.queue.append(lambda: method(self, *args, **kwargs)) + else: + return method(self, *args, **kwargs) + + class DataStore(dbus.service.Object): """D-Bus API and logic for connecting all the other components. """ def __init__(self, **options): + self.frozen = False + self.queue = [] bus_name = dbus.service.BusName(DS_SERVICE, bus=dbus.SessionBus(), replace_existing=False, @@ -149,6 +165,7 @@ class DataStore(dbus.service.Object): out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) + @queue_if_frozen def create(self, props, file_path, transfer_ownership, async_cb, async_err_cb): uid = str(uuid.uuid4()) @@ -186,6 +203,7 @@ class DataStore(dbus.service.Object): out_signature='', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) + @queue_if_frozen def update(self, uid, props, file_path, transfer_ownership, async_cb, async_err_cb): logging.debug('datastore.update %r', uid) @@ -210,9 +228,14 @@ class DataStore(dbus.service.Object): pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}as', - out_signature='aa{sv}u') - def find(self, query, properties): + in_signature='a{sv}as', out_signature='aa{sv}u', + async_callbacks=('async_cb', 'async_err_cb')) + @queue_if_frozen + def find(self, query, properties, async_cb, async_err_cb): + self._sync_to_dbus_async(self._find, query, properties, + async_cb=async_cb, async_err_cb=async_err_cb) + + def _find(self, query, properties): logging.debug('datastore.find %r', query) t = time.time() @@ -259,10 +282,15 @@ class DataStore(dbus.service.Object): return entries, count @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='s', - sender_keyword='sender') - def get_filename(self, uid, sender=None): + in_signature='s', out_signature='s', + sender_keyword='sender', + async_callbacks=('async_cb', 'async_err_cb')) + @queue_if_frozen + def get_filename(self, uid, async_cb, async_err_cb, sender=None): + self._sync_to_dbus_async(self._get_filename, uid, sender, + async_cb=async_cb, async_err_cb=async_err_cb) + + def _get_filename(self, uid, sender=None): logging.debug('datastore.get_filename %r', uid) user_id = dbus.Bus().get_unix_user(sender) extension = self._get_extension(uid) @@ -275,17 +303,27 @@ class DataStore(dbus.service.Object): return mime.get_primary_extension(mime_type) @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='a{sv}') - def get_properties(self, uid): + in_signature='s', out_signature='a{sv}', + async_callbacks=('async_cb', 'async_err_cb')) + @queue_if_frozen + def get_properties(self, uid, async_cb, async_err_cb): + self._sync_to_dbus_async(self._get_properties, uid, + async_cb=async_cb, async_err_cb=async_err_cb) + + def _get_properties(self, uid): logging.debug('datastore.get_properties %r', uid) metadata = self._metadata_store.retrieve(uid) return metadata @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}', - out_signature='as') - def get_uniquevaluesfor(self, propertyname, query=None): + in_signature='sa{sv}', out_signature='as', + async_callbacks=('async_cb', 'async_err_cb')) + @queue_if_frozen + def get_uniquevaluesfor(self, propertyname, query, async_cb, async_err_cb): + self._sync_to_dbus_async(self._get_uniquevaluesfor, propertyname, + query, async_cb=async_cb, async_err_cb=async_err_cb) + + def _get_uniquevaluesfor(self, propertyname, query): if propertyname != 'activity': raise ValueError('Only ''activity'' is a supported property name') if query: @@ -297,9 +335,14 @@ class DataStore(dbus.service.Object): return [] @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='') - def delete(self, uid): + in_signature='s', out_signature='', + async_callbacks=('async_cb', 'async_err_cb')) + @queue_if_frozen + def delete(self, uid, async_cb, async_err_cb): + self._sync_to_dbus_async(self._delete, uid, + async_cb=async_cb, async_err_cb=async_err_cb) + + def _delete(self, uid): self._optimizer.remove(uid) self._index_store.delete(uid) @@ -350,3 +393,64 @@ class DataStore(dbus.service.Object): @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") def Unmounted(self, descriptor): pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='', out_signature='', + async_callbacks=('async_cb', 'async_err_cb')) + def freeze(self, async_cb, async_err_cb): + """Close all open files and stop processing requests until thaw. + + Intended for doing online backups. + """ + self.frozen = True + self._index_store.close_index() + gobject.idle_add(self._freeze_wait_cb, async_cb, async_err_cb, + priority=gobject.PRIORITY_LOW) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='', out_signature='') + def thaw(self): + """Resume normal operation after freeze. + """ + self._index_store.open_index() + self.frozen = False + gobject.idle_add(self._thaw_work_cb, priority=gobject.PRIORITY_DEFAULT) + + def _freeze_wait_cb(self, async_cb, async_err_cb): + """Stall the freeze() method while asynchronous operations are in + progress. + """ + if self._file_store.async_running(): + return True + + async_cb() + return False + + def _thaw_work_cb(self, async_cb, async_err_cb): + """Process requests that were queued during freeze.""" + if not self.queue: + return False + + self.queue.pop(0)() + return True + + def _sync_to_dbus_async(self, method, *args, **kwargs): + """Wrap a synchronous method in an asynchronous DBus method. + + The DBus callbacks must be passed as keyword arguments async_cb and + async_err_cb. + """ + async_cb = kwargs.pop('async_cb') + async_err_cb = kwargs.pop('async_err_cb') + try: + result = method(*args, **kwargs) + except Exception, exception: + async_err_cb(exception) + return + + if isinstance(result, tuple): + async_cb(*result) + elif result is None: + async_cb() + else: + async_cb(result) diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index 9724397..c8607c0 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -28,6 +28,9 @@ class FileStore(object): """Handle the storage of one file per entry. """ + def __init__(self): + self._async_ops = [] + # TODO: add protection against store and retrieve operations on entries # that are being processed async. @@ -76,8 +79,13 @@ class FileStore(object): """ logging.debug('FileStore copying from %r to %r', file_path, destination_path) - async_copy = AsyncCopy(file_path, destination_path, completion_cb, - unlink_src) + def async_finished_cb(async_copy, error): + self._async_ops.remove(async_copy) + completion_cb(error) + + async_copy = AsyncCopy(file_path, destination_path, async_finished_cb, + unlink_src) + self._async_ops.append(async_copy) async_copy.start() def retrieve(self, uid, user_id, extension): @@ -156,6 +164,11 @@ class FileStore(object): logging.debug('hard linking %r -> %r', new_file, existing_file) os.link(existing_file, new_file) + def async_running(self): + """Check whether there are any asynchronous copies in progress. + """ + return bool(self._async_ops) + class AsyncCopy(object): """Copy a file in chunks in the idle loop. @@ -209,7 +222,7 @@ class AsyncCopy(object): self._cleanup() if self._unlink_src: os.unlink(self.src) - self.completion(*args) + self.completion(self, *args) def start(self): self.src_fp = os.open(self.src, os.O_RDONLY) |