# Copyright (C) 2008, One Laptop Per Child # Based on code Copyright (C) 2007, ObjectRealms, LLC # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import logging import uuid import time import os import traceback import dbus import dbus.service import gobject from sugar import mime from carquinyol import layoutmanager from carquinyol import migration from carquinyol.layoutmanager import MAX_QUERY_LIMIT from carquinyol.metadatastore import MetadataStore from carquinyol.indexstore import IndexStore from carquinyol.filestore import FileStore from carquinyol.optimizer import Optimizer # the name used by the logger DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' DS_SERVICE = "org.laptop.sugar.DataStore" DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" logger = logging.getLogger(DS_LOG_CHANNEL) class DataStore(dbus.service.Object): """D-Bus API and logic for connecting all the other components. """ def __init__(self, **options): bus_name = dbus.service.BusName(DS_SERVICE, bus=dbus.SessionBus(), replace_existing=False, allow_replacement=False) dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) migrated = self._migrate() self._metadata_store = MetadataStore() self._file_store = FileStore() self._optimizer = Optimizer(self._file_store, self._metadata_store) self._index_store = IndexStore() if migrated: self._rebuild_index() return try: self._index_store.open_index() except Exception: logging.exception('Failed to open index, will rebuild') self._rebuild_index() return if not layoutmanager.get_instance().index_updated: logging.debug('Index is not up-to-date, will update') self._update_index() def _migrate(self): """Check version of data store on disk and migrate if necessary. Returns True if migration was done and an index rebuild is required, False otherwise. """ layout_manager = layoutmanager.get_instance() old_version = layout_manager.get_version() if old_version == layoutmanager.CURRENT_LAYOUT_VERSION: return False if old_version == 0: migration.migrate_from_0() layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION) return True def _rebuild_index(self): """Remove and recreate index.""" layoutmanager.get_instance().index_updated = False self._index_store.close_index() self._index_store.remove_index() self._index_store.open_index() self._update_index() def _update_index(self): """Find entries that are not yet in the index and add them.""" uids = layoutmanager.get_instance().find_all() logging.debug('Going to update the index with object_ids %r', uids) gobject.idle_add(lambda: self.__update_index_cb(uids), priority=gobject.PRIORITY_LOW) def __update_index_cb(self, uids): if uids: uid = uids.pop() logging.debug('Updating entry %r in index. %d to go.', uid, len(uids)) if not self._index_store.contains(uid): try: props = self._metadata_store.retrieve(uid) self._index_store.store(uid, props) except Exception: logging.exception('Error processing %r', uid) if not uids: logging.debug('Finished updating index.') layoutmanager.get_instance().index_updated = True return False else: return True def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb, async_err_cb, uid, exc) if exc is not None: async_err_cb(exc) return self.Created(uid) self._optimizer.optimize(uid) logger.debug('created %s', uid) async_cb(uid) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}sb', out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def create(self, props, file_path, transfer_ownership, async_cb, async_err_cb): uid = str(uuid.uuid4()) logging.debug('datastore.create %r', uid) if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) self._metadata_store.store(uid, props) self._index_store.store(uid, props) self._file_store.store(uid, file_path, transfer_ownership, lambda *args: self._create_completion_cb(async_cb, async_err_cb, uid, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Created(self, uid): pass def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): logger.debug('_update_completion_cb() called with %r / %r, exc %r', async_cb, async_err_cb, exc) if exc is not None: async_err_cb(exc) return self.Updated(uid) self._optimizer.optimize(uid) logger.debug('updated %s', uid) async_cb() @dbus.service.method(DS_DBUS_INTERFACE, in_signature='sa{sv}sb', out_signature='', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def update(self, uid, props, file_path, transfer_ownership, async_cb, async_err_cb): logging.debug('datastore.update %r', uid) if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) self._metadata_store.store(uid, props) self._index_store.store(uid, props) if os.path.exists(self._file_store.get_file_path(uid)) and \ (not file_path or os.path.exists(file_path)): self._optimizer.remove(uid) self._file_store.store(uid, file_path, transfer_ownership, lambda *args: self._update_completion_cb(async_cb, async_err_cb, uid, *args)) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Updated(self, uid): pass @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}as', out_signature='aa{sv}u') def find(self, query, properties): logging.debug('datastore.find %r', query) t = time.time() if layoutmanager.get_instance().index_updated: try: uids, count = self._index_store.find(query) except Exception: logging.exception('Failed to query index, will rebuild') self._rebuild_index() if not layoutmanager.get_instance().index_updated: logging.warning('Index updating, returning all entries') return self._find_all(query, properties) entries = [] for uid in uids: entry_path = layoutmanager.get_instance().get_entry_path(uid) if not os.path.exists(entry_path): logging.warning( 'Inconsistency detected, returning all entries') self._rebuild_index() return self._find_all(query, properties) metadata = self._metadata_store.retrieve(uid, properties) entries.append(metadata) logger.debug('find(): %r', time.time() - t) return entries, count def _find_all(self, query, properties): uids = layoutmanager.get_instance().find_all() count = len(uids) offset = query.get('offset', 0) limit = query.get('limit', MAX_QUERY_LIMIT) uids = uids[offset:offset + limit] entries = [] for uid in uids: metadata = self._metadata_store.retrieve(uid, properties) entries.append(metadata) return entries, count @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='s', sender_keyword='sender') def get_filename(self, uid, sender=None): logging.debug('datastore.get_filename %r', uid) user_id = dbus.Bus().get_unix_user(sender) extension = self._get_extension(uid) return self._file_store.retrieve(uid, user_id, extension) def _get_extension(self, uid): mime_type = self._metadata_store.get_property(uid, 'mime_type') if mime_type is None or not mime_type: return '' return mime.get_primary_extension(mime_type) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='a{sv}') def get_properties(self, uid): logging.debug('datastore.get_properties %r', uid) metadata = self._metadata_store.retrieve(uid) return metadata @dbus.service.method(DS_DBUS_INTERFACE, in_signature='sa{sv}', out_signature='as') def get_uniquevaluesfor(self, propertyname, query=None): if propertyname != 'activity': raise ValueError('Only ''activity'' is a supported property name') if query: raise ValueError('The query parameter is not supported') if layoutmanager.get_instance().index_updated: return self._index_store.get_activities() else: logging.warning('Index updating, returning an empty list') return [] @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='') def delete(self, uid): self._optimizer.remove(uid) self._index_store.delete(uid) self._file_store.delete(uid) self._metadata_store.delete(uid) entry_path = layoutmanager.get_instance().get_entry_path(uid) os.removedirs(entry_path) self.Deleted(uid) logger.debug('deleted %s', uid) @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") def Deleted(self, uid): pass def stop(self): """shutdown the service""" self._index_store.close_index() self.Stopped() @dbus.service.signal(DS_DBUS_INTERFACE) def Stopped(self): pass @dbus.service.method(DS_DBUS_INTERFACE, in_signature="sa{sv}", out_signature='s') def mount(self, uri, options=None): return '' @dbus.service.method(DS_DBUS_INTERFACE, in_signature="", out_signature="aa{sv}") def mounts(self): return [{'id': 1}] @dbus.service.method(DS_DBUS_INTERFACE, in_signature="s", out_signature="") def unmount(self, mountpoint_id): pass @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") def Mounted(self, descriptior): pass @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") def Unmounted(self, descriptor): pass