Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/datastore/src/carquinyol/datastore.py
diff options
context:
space:
mode:
Diffstat (limited to 'datastore/src/carquinyol/datastore.py')
-rw-r--r--datastore/src/carquinyol/datastore.py354
1 files changed, 354 insertions, 0 deletions
diff --git a/datastore/src/carquinyol/datastore.py b/datastore/src/carquinyol/datastore.py
new file mode 100644
index 0000000..82a6207
--- /dev/null
+++ b/datastore/src/carquinyol/datastore.py
@@ -0,0 +1,354 @@
+# Copyright (C) 2008, One Laptop Per Child
+# Based on code Copyright (C) 2007, ObjectRealms, LLC
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import logging
+import uuid
+import time
+import os
+import traceback
+
+import dbus
+import dbus.service
+import gobject
+
+from sugar import mime
+
+from carquinyol import layoutmanager
+from carquinyol import migration
+from carquinyol.layoutmanager import MAX_QUERY_LIMIT
+from carquinyol.metadatastore import MetadataStore
+from carquinyol.indexstore import IndexStore
+from carquinyol.filestore import FileStore
+from carquinyol.optimizer import Optimizer
+
+# the name used by the logger
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+
+DS_SERVICE = "org.laptop.sugar.DataStore"
+DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore"
+DS_OBJECT_PATH = "/org/laptop/sugar/DataStore"
+
+logger = logging.getLogger(DS_LOG_CHANNEL)
+
+
+class DataStore(dbus.service.Object):
+ """D-Bus API and logic for connecting all the other components.
+ """
+
+ def __init__(self, **options):
+ bus_name = dbus.service.BusName(DS_SERVICE,
+ bus=dbus.SessionBus(),
+ replace_existing=False,
+ allow_replacement=False)
+ dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH)
+
+ migrated = self._migrate()
+
+ self._metadata_store = MetadataStore()
+ self._file_store = FileStore()
+ self._optimizer = Optimizer(self._file_store, self._metadata_store)
+ self._index_store = IndexStore()
+ self._index_updating = False
+
+ if migrated:
+ self._rebuild_index()
+ return
+
+ try:
+ self._index_store.open_index()
+ except Exception:
+ logging.exception('Failed to open index, will rebuild')
+ self._rebuild_index()
+ return
+
+ if not self._index_store.index_updated:
+ logging.debug('Index is not up-to-date, will update')
+ self._update_index()
+
+ def _migrate(self):
+ """Check version of data store on disk and migrate if necessary.
+
+ Returns True if migration was done and an index rebuild is required,
+ False otherwise.
+ """
+ layout_manager = layoutmanager.get_instance()
+ old_version = layout_manager.get_version()
+ if old_version == layoutmanager.CURRENT_LAYOUT_VERSION:
+ return False
+
+ if old_version == 0:
+ migration.migrate_from_0()
+
+ layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION)
+ return True
+
+ def _rebuild_index(self):
+ """Remove and recreate index."""
+ self._index_store.close_index()
+ self._index_store.remove_index()
+ self._index_store.open_index()
+ self._update_index()
+
+ def _update_index(self):
+ """Find entries that are not yet in the index and add them."""
+ uids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with object_ids %r',
+ uids)
+ self._index_updating = True
+ gobject.idle_add(lambda: self.__update_index_cb(uids),
+ priority=gobject.PRIORITY_LOW)
+
+ def __update_index_cb(self, uids):
+ if uids:
+ uid = uids.pop()
+
+ logging.debug('Updating entry %r in index. %d to go.', uid,
+ len(uids))
+
+ if not self._index_store.contains(uid):
+ try:
+ props = self._metadata_store.retrieve(uid)
+ self._index_store.store(uid, props)
+ except Exception:
+ logging.exception('Error processing %r', uid)
+
+ if not uids:
+ self._index_store.flush()
+ self._index_updating = False
+ logging.debug('Finished updating index.')
+ return False
+ else:
+ return True
+
+ def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb,
+ async_err_cb, uid, exc)
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Created(uid)
+ self._optimizer.optimize(uid)
+ logger.debug('created %s', uid)
+ async_cb(uid)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='a{sv}sb',
+ out_signature='s',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def create(self, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ uid = str(uuid.uuid4())
+ logging.debug('datastore.create %r', uid)
+
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._create_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Created(self, uid):
+ pass
+
+ def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug('_update_completion_cb() called with %r / %r, exc %r',
+ async_cb, async_err_cb, exc)
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Updated(uid)
+ self._optimizer.optimize(uid)
+ logger.debug('updated %s', uid)
+ async_cb()
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def update(self, uid, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ logging.debug('datastore.update %r', uid)
+
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+
+ if os.path.exists(self._file_store.get_file_path(uid)) and \
+ (not file_path or os.path.exists(file_path)):
+ self._optimizer.remove(uid)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._update_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Updated(self, uid):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='a{sv}as',
+ out_signature='aa{sv}u')
+ def find(self, query, properties):
+ logging.debug('datastore.find %r', query)
+ t = time.time()
+
+ if not self._index_updating:
+ try:
+ uids, count = self._index_store.find(query)
+ except Exception:
+ logging.exception('Failed to query index, will rebuild')
+ self._rebuild_index()
+
+ if self._index_updating:
+ logging.warning('Index updating, returning all entries')
+ return self._find_all(query, properties)
+
+ entries = []
+ for uid in uids:
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(entry_path):
+ logging.warning(
+ 'Inconsistency detected, returning all entries')
+ self._rebuild_index()
+ return self._find_all(query, properties)
+
+ metadata = self._metadata_store.retrieve(uid, properties)
+ entries.append(metadata)
+
+ logger.debug('find(): %r', time.time() - t)
+
+ return entries, count
+
+ def _find_all(self, query, properties):
+ uids = layoutmanager.get_instance().find_all()
+ count = len(uids)
+
+ offset = query.get('offset', 0)
+ limit = query.get('limit', MAX_QUERY_LIMIT)
+ uids = uids[offset:offset + limit]
+
+ entries = []
+ for uid in uids:
+ metadata = self._metadata_store.retrieve(uid, properties)
+ entries.append(metadata)
+
+ return entries, count
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='s',
+ sender_keyword='sender')
+ def get_filename(self, uid, sender=None):
+ logging.debug('datastore.get_filename %r', uid)
+ user_id = dbus.Bus().get_unix_user(sender)
+ extension = self._get_extension(uid)
+ return self._file_store.retrieve(uid, user_id, extension)
+
+ def _get_extension(self, uid):
+ mime_type = self._metadata_store.get_property(uid, 'mime_type')
+ if mime_type is None or not mime_type:
+ return ''
+ return mime.get_primary_extension(mime_type)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='a{sv}')
+ def get_properties(self, uid):
+ logging.debug('datastore.get_properties %r', uid)
+ metadata = self._metadata_store.retrieve(uid)
+ return metadata
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}',
+ out_signature='as')
+ def get_uniquevaluesfor(self, propertyname, query=None):
+ if propertyname != 'activity':
+ raise ValueError('Only ''activity'' is a supported property name')
+ if query:
+ raise ValueError('The query parameter is not supported')
+ if not self._index_updating:
+ return self._index_store.get_activities()
+ else:
+ logging.warning('Index updating, returning an empty list')
+ return []
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='')
+ def delete(self, uid):
+ self._optimizer.remove(uid)
+
+ self._index_store.delete(uid)
+ self._file_store.delete(uid)
+ self._metadata_store.delete(uid)
+
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.removedirs(entry_path)
+
+ self.Deleted(uid)
+ logger.debug('deleted %s', uid)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Deleted(self, uid):
+ pass
+
+ def stop(self):
+ """shutdown the service"""
+ self._index_store.close_index()
+ self.Stopped()
+
+ @dbus.service.signal(DS_DBUS_INTERFACE)
+ def Stopped(self):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="sa{sv}",
+ out_signature='s')
+ def mount(self, uri, options=None):
+ return ''
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="",
+ out_signature="aa{sv}")
+ def mounts(self):
+ return [{'id': 1}]
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="s",
+ out_signature="")
+ def unmount(self, mountpoint_id):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Mounted(self, descriptior):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Unmounted(self, descriptor):
+ pass