diff options
author | Marco Pesenti Gritti <marco@marcopg.org> | 2010-08-22 02:45:43 (GMT) |
---|---|---|
committer | Marco Pesenti Gritti <marco@marcopg.org> | 2010-08-22 02:45:43 (GMT) |
commit | 192ad89c9ade5dd9c38cfe3280666f3a2c7c17d3 (patch) | |
tree | 2ab249ebe5ab915fc48231789d4cd1c4d5be18eb /datastore/src/carquinyol/datastore.py | |
parent | 415b048faad644549d270817b5fb8910c2752bdb (diff) | |
parent | 0c3d1b3aaeb0ca69693aa325e32e143a9fae047f (diff) |
Merge datastore module in a subdirectory
Diffstat (limited to 'datastore/src/carquinyol/datastore.py')
-rw-r--r-- | datastore/src/carquinyol/datastore.py | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/datastore/src/carquinyol/datastore.py b/datastore/src/carquinyol/datastore.py new file mode 100644 index 0000000..82a6207 --- /dev/null +++ b/datastore/src/carquinyol/datastore.py @@ -0,0 +1,354 @@ +# Copyright (C) 2008, One Laptop Per Child +# Based on code Copyright (C) 2007, ObjectRealms, LLC +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import uuid +import time +import os +import traceback + +import dbus +import dbus.service +import gobject + +from sugar import mime + +from carquinyol import layoutmanager +from carquinyol import migration +from carquinyol.layoutmanager import MAX_QUERY_LIMIT +from carquinyol.metadatastore import MetadataStore +from carquinyol.indexstore import IndexStore +from carquinyol.filestore import FileStore +from carquinyol.optimizer import Optimizer + +# the name used by the logger +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' + +DS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" + +logger = logging.getLogger(DS_LOG_CHANNEL) + + +class DataStore(dbus.service.Object): + """D-Bus API and logic for connecting all the other components. + """ + + def __init__(self, **options): + bus_name = dbus.service.BusName(DS_SERVICE, + bus=dbus.SessionBus(), + replace_existing=False, + allow_replacement=False) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) + + migrated = self._migrate() + + self._metadata_store = MetadataStore() + self._file_store = FileStore() + self._optimizer = Optimizer(self._file_store, self._metadata_store) + self._index_store = IndexStore() + self._index_updating = False + + if migrated: + self._rebuild_index() + return + + try: + self._index_store.open_index() + except Exception: + logging.exception('Failed to open index, will rebuild') + self._rebuild_index() + return + + if not self._index_store.index_updated: + logging.debug('Index is not up-to-date, will update') + self._update_index() + + def _migrate(self): + """Check version of data store on disk and migrate if necessary. + + Returns True if migration was done and an index rebuild is required, + False otherwise. + """ + layout_manager = layoutmanager.get_instance() + old_version = layout_manager.get_version() + if old_version == layoutmanager.CURRENT_LAYOUT_VERSION: + return False + + if old_version == 0: + migration.migrate_from_0() + + layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION) + return True + + def _rebuild_index(self): + """Remove and recreate index.""" + self._index_store.close_index() + self._index_store.remove_index() + self._index_store.open_index() + self._update_index() + + def _update_index(self): + """Find entries that are not yet in the index and add them.""" + uids = layoutmanager.get_instance().find_all() + logging.debug('Going to update the index with object_ids %r', + uids) + self._index_updating = True + gobject.idle_add(lambda: self.__update_index_cb(uids), + priority=gobject.PRIORITY_LOW) + + def __update_index_cb(self, uids): + if uids: + uid = uids.pop() + + logging.debug('Updating entry %r in index. %d to go.', uid, + len(uids)) + + if not self._index_store.contains(uid): + try: + props = self._metadata_store.retrieve(uid) + self._index_store.store(uid, props) + except Exception: + logging.exception('Error processing %r', uid) + + if not uids: + self._index_store.flush() + self._index_updating = False + logging.debug('Finished updating index.') + return False + else: + return True + + def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb, + async_err_cb, uid, exc) + if exc is not None: + async_err_cb(exc) + return + + self.Created(uid) + self._optimizer.optimize(uid) + logger.debug('created %s', uid) + async_cb(uid) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}sb', + out_signature='s', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def create(self, props, file_path, transfer_ownership, + async_cb, async_err_cb): + uid = str(uuid.uuid4()) + logging.debug('datastore.create %r', uid) + + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._create_completion_cb(async_cb, + async_err_cb, + uid, + *args)) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Created(self, uid): + pass + + def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug('_update_completion_cb() called with %r / %r, exc %r', + async_cb, async_err_cb, exc) + if exc is not None: + async_err_cb(exc) + return + + self.Updated(uid) + self._optimizer.optimize(uid) + logger.debug('updated %s', uid) + async_cb() + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def update(self, uid, props, file_path, transfer_ownership, + async_cb, async_err_cb): + logging.debug('datastore.update %r', uid) + + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + + if os.path.exists(self._file_store.get_file_path(uid)) and \ + (not file_path or os.path.exists(file_path)): + self._optimizer.remove(uid) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._update_completion_cb(async_cb, + async_err_cb, + uid, + *args)) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Updated(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}as', + out_signature='aa{sv}u') + def find(self, query, properties): + logging.debug('datastore.find %r', query) + t = time.time() + + if not self._index_updating: + try: + uids, count = self._index_store.find(query) + except Exception: + logging.exception('Failed to query index, will rebuild') + self._rebuild_index() + + if self._index_updating: + logging.warning('Index updating, returning all entries') + return self._find_all(query, properties) + + entries = [] + for uid in uids: + entry_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(entry_path): + logging.warning( + 'Inconsistency detected, returning all entries') + self._rebuild_index() + return self._find_all(query, properties) + + metadata = self._metadata_store.retrieve(uid, properties) + entries.append(metadata) + + logger.debug('find(): %r', time.time() - t) + + return entries, count + + def _find_all(self, query, properties): + uids = layoutmanager.get_instance().find_all() + count = len(uids) + + offset = query.get('offset', 0) + limit = query.get('limit', MAX_QUERY_LIMIT) + uids = uids[offset:offset + limit] + + entries = [] + for uid in uids: + metadata = self._metadata_store.retrieve(uid, properties) + entries.append(metadata) + + return entries, count + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='s', + sender_keyword='sender') + def get_filename(self, uid, sender=None): + logging.debug('datastore.get_filename %r', uid) + user_id = dbus.Bus().get_unix_user(sender) + extension = self._get_extension(uid) + return self._file_store.retrieve(uid, user_id, extension) + + def _get_extension(self, uid): + mime_type = self._metadata_store.get_property(uid, 'mime_type') + if mime_type is None or not mime_type: + return '' + return mime.get_primary_extension(mime_type) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='a{sv}') + def get_properties(self, uid): + logging.debug('datastore.get_properties %r', uid) + metadata = self._metadata_store.retrieve(uid) + return metadata + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}', + out_signature='as') + def get_uniquevaluesfor(self, propertyname, query=None): + if propertyname != 'activity': + raise ValueError('Only ''activity'' is a supported property name') + if query: + raise ValueError('The query parameter is not supported') + if not self._index_updating: + return self._index_store.get_activities() + else: + logging.warning('Index updating, returning an empty list') + return [] + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='') + def delete(self, uid): + self._optimizer.remove(uid) + + self._index_store.delete(uid) + self._file_store.delete(uid) + self._metadata_store.delete(uid) + + entry_path = layoutmanager.get_instance().get_entry_path(uid) + os.removedirs(entry_path) + + self.Deleted(uid) + logger.debug('deleted %s', uid) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Deleted(self, uid): + pass + + def stop(self): + """shutdown the service""" + self._index_store.close_index() + self.Stopped() + + @dbus.service.signal(DS_DBUS_INTERFACE) + def Stopped(self): + pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="sa{sv}", + out_signature='s') + def mount(self, uri, options=None): + return '' + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="", + out_signature="aa{sv}") + def mounts(self): + return [{'id': 1}] + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="s", + out_signature="") + def unmount(self, mountpoint_id): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Mounted(self, descriptior): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Unmounted(self, descriptor): + pass |