Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/carquinyol
diff options
context:
space:
mode:
Diffstat (limited to 'src/carquinyol')
-rw-r--r--src/carquinyol/datastore.py698
-rw-r--r--src/carquinyol/filestore.py48
-rw-r--r--src/carquinyol/indexstore.py143
-rw-r--r--src/carquinyol/layoutmanager.py61
-rw-r--r--src/carquinyol/metadatastore.py37
-rw-r--r--src/carquinyol/migration.py123
-rw-r--r--src/carquinyol/optimizer.py82
7 files changed, 846 insertions, 346 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
index 0fc8be1..bc8e208 100644
--- a/src/carquinyol/datastore.py
+++ b/src/carquinyol/datastore.py
@@ -25,9 +25,11 @@ import os
import dbus
import dbus.service
+import gconf
import gobject
from sugar import mime
+from sugar.logger import trace
from carquinyol import layoutmanager
from carquinyol import migration
@@ -41,12 +43,207 @@ from carquinyol.optimizer import Optimizer
DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
DS_SERVICE = "org.laptop.sugar.DataStore"
-DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore"
-DS_OBJECT_PATH = "/org/laptop/sugar/DataStore"
+DS_DBUS_INTERFACE1 = "org.laptop.sugar.DataStore"
+DS_OBJECT_PATH1 = "/org/laptop/sugar/DataStore"
+DS_DBUS_INTERFACE2 = "org.laptop.sugar.DataStore2"
+DS_OBJECT_PATH2 = "/org/laptop/sugar/DataStore2"
logger = logging.getLogger(DS_LOG_CHANNEL)
+class DBusAPIv1(dbus.service.Object):
+ """Compatibility layer for old (unversioned) D-Bus API
+ """
+
+ # pylint: disable-msg=W0212
+
+ def __init__(self, data_store):
+ self._data_store = data_store
+ bus_name = dbus.service.BusName(DS_SERVICE, bus=dbus.SessionBus(),
+ replace_existing=False, allow_replacement=False)
+ dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH1)
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='a{sv}sb', out_signature='s',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def create(self, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+
+ def success_cb(tree_id, child_id):
+ self.Created(tree_id)
+ async_cb(tree_id)
+
+ self._data_store._save(tree_id='', parent_id='', metadata=props,
+ path=file_path, delete_after=transfer_ownership,
+ async_cb=success_cb, async_err_cb=async_err_cb)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s")
+ def Created(self, uid):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def update(self, uid, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+
+ def success_cb(tree_id, child_id):
+ self.Updated(tree_id)
+ async_cb()
+
+ latest_versions = self._get_latest(uid)
+ if not latest_versions:
+ raise ValueError('Trying to update non-existant entry %s - wanted'
+ ' to use create()?' % (uid, ))
+
+ parent = latest_versions[0]
+ if self._compare_checksums(parent, file_path):
+ self._data_store._change_metadata(parent['tree_id'],
+ parent['version_id'], props)
+ return success_cb(uid, None)
+
+ self._data_store._save(tree_id=uid,
+ parent_id=parent['version_id'], metadata=props,
+ path=file_path, delete_after=transfer_ownership,
+ async_cb=success_cb, async_err_cb=async_err_cb)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s")
+ def Updated(self, uid):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='a{sv}as',
+ out_signature='aa{sv}u')
+ def find(self, query, properties):
+ if 'uid' in properties:
+ properties.append('tree_id')
+ properties.remove('uid')
+
+ options = {'metadata': properties}
+ for name in ['offset', 'limit', 'order_by']:
+ if name in query:
+ options[name] = query.pop(name)
+
+ if 'uid' in query:
+ query['tree_id'] = query.pop('uid')
+
+ results, count = self._data_store._find(query, options,
+ query.get('query'))
+
+ if 'tree_id' in properties:
+ for entry in results:
+ entry['uid'] = entry.pop('tree_id')
+
+ return results, count
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='s',
+ out_signature='s',
+ sender_keyword='sender')
+ def get_filename(self, uid, sender=None):
+ latest_versions = self._get_latest(uid)
+ if not latest_versions:
+ raise ValueError('Entry %s does not exist' % (uid, ))
+
+ return self._data_store._get_data(uid,
+ latest_versions[0]['version_id'], sender=sender)
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='s',
+ out_signature='a{sv}')
+ def get_properties(self, uid):
+ latest_versions = self._get_latest(uid)
+ if not latest_versions:
+ raise ValueError('Entry %s does not exist' % (uid, ))
+
+ latest_versions[0]['uid'] = latest_versions[0].pop('tree_id')
+ del latest_versions[0]['version_id']
+ return latest_versions[0]
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='sa{sv}',
+ out_signature='as')
+ def get_uniquevaluesfor(self, propertyname, query=None):
+ return self._data_store._find_unique_values(query, propertyname)
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature='s',
+ out_signature='')
+ def delete(self, uid):
+ latest_versions = self._get_latest(uid)
+ if not latest_versions:
+ raise ValueError('Entry %s does not exist' % (uid, ))
+
+ self._data_store._delete(tree_id=uid,
+ version_id=latest_versions[0]['version_id'])
+ self.Deleted(uid)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s")
+ def Deleted(self, uid):
+ pass
+
+ def stop(self):
+ """shutdown the service"""
+ self._data_store._stop()
+ self.Stopped()
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1)
+ def Stopped(self):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature="sa{sv}",
+ out_signature='s')
+ def mount(self, uri, options=None):
+ return ''
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature="",
+ out_signature="aa{sv}")
+ def mounts(self):
+ return [{'id': 1}]
+
+ @dbus.service.method(DS_DBUS_INTERFACE1,
+ in_signature="s",
+ out_signature="")
+ def unmount(self, mountpoint_id):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}")
+ def Mounted(self, descriptior):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}")
+ def Unmounted(self, descriptor):
+ pass
+
+ def _get_latest(self, uid):
+ return self._data_store._find({'tree_id': uid},
+ {'limit': 1, 'order_by': ['+timestamp']})[0]
+
+ @trace()
+ def _compare_checksums(self, parent, child_data):
+ parent_data = self._data_store._file_store.get_file_path(
+ (parent['tree_id'], parent['version_id']))
+ if bool(child_data) ^ os.path.exists(parent_data):
+ return False
+ elif not child_data:
+ return True
+
+ if 'checksum' in parent:
+ parent_checksum = parent['checksum']
+ else:
+ parent_checksum = self._data_store._optimizer.calculate_md5sum(
+ parent_data)
+
+ child_checksum = self._data_store._optimizer.calculate_md5sum(
+ child_data)
+ return parent_checksum == child_checksum
+
+
class DataStore(dbus.service.Object):
"""D-Bus API and logic for connecting all the other components.
"""
@@ -56,7 +253,12 @@ class DataStore(dbus.service.Object):
bus=dbus.SessionBus(),
replace_existing=False,
allow_replacement=False)
- dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH)
+ dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH2)
+ self._api_v1 = DBusAPIv1(self)
+ gconf_client = gconf.client_get_default()
+ self._max_versions = gconf_client.get_int(
+ '/desktop/sugar/datastore/max_versions')
+ logging.debug('max_versions=%r', self._max_versions)
migrated, initiated = self._open_layout()
@@ -104,7 +306,7 @@ class DataStore(dbus.service.Object):
if old_version == 0:
migration.migrate_from_0()
- elif old_version in [1, 2, 4]
+ elif old_version in [1, 2, 4]:
migration.migrate_from_1()
layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION)
@@ -119,26 +321,26 @@ class DataStore(dbus.service.Object):
def _update_index(self):
"""Find entries that are not yet in the index and add them."""
- uids = layoutmanager.get_instance().find_all()
+ object_ids = layoutmanager.get_instance().find_all()
logging.debug('Going to update the index with object_ids %r',
- uids)
+ object_ids)
self._index_updating = True
- gobject.idle_add(lambda: self.__update_index_cb(uids),
+ gobject.idle_add(lambda: self.__update_index_cb(object_ids),
priority=gobject.PRIORITY_LOW)
- def __update_index_cb(self, uids):
- if uids:
- uid = uids.pop()
+ def __update_index_cb(self, object_ids):
+ if object_ids:
+ object_id = object_ids.pop()
- logging.debug('Updating entry %r in index. %d to go.', uid,
- len(uids))
+ logging.debug('Updating entry %r in index. %d to go.', object_id,
+ len(object_ids))
- if not self._index_store.contains(uid):
+ if not self._index_store.contains(object_id):
try:
update_metadata = False
- props = self._metadata_store.retrieve(uid)
+ props = self._metadata_store.retrieve(object_id)
if 'filesize' not in props:
- path = self._file_store.get_file_path(uid)
+ path = self._file_store.get_file_path(object_id)
if os.path.exists(path):
props['filesize'] = os.stat(path).st_size
update_metadata = True
@@ -157,12 +359,12 @@ class DataStore(dbus.service.Object):
props['creation_time'] = props['timestamp']
update_metadata = True
if update_metadata:
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
+ self._metadata_store.store(object_id, props)
+ self._index_store.store(object_id, props)
except Exception:
- logging.exception('Error processing %r', uid)
+ logging.exception('Error processing %r', object_id)
- if not uids:
+ if not object_ids:
self._index_store.flush()
self._index_updating = False
logging.debug('Finished updating index.')
@@ -170,199 +372,314 @@ class DataStore(dbus.service.Object):
else:
return True
- def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb,
- async_err_cb, uid, exc)
- if exc is not None:
- async_err_cb(exc)
- return
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='ssa{sv}sb',
+ out_signature='ss',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb,
+ async_err_cb):
+ """Store new (version of an) object and return (tree_id, child_id).
+
+ Submit new version (data and/or metadata) of the given object,
+ returning (tree_id, child_id). parent_id and child_id are the
+ version_id of the predecessor resp. the to-be-saved entry.
+
+ If tree_id is empty parent_id must be empty as well and both will be
+ newly allocated (equivalent to create() in previous API). If only
+ parent_id is empty there must not be any entry with the same tree_id
+ already in datastore.
+
+ File storage happens synchronously, emitting the signal
+ Saved(tree_id, parent_id, child_id) on completion (i.e. before
+ returning from save()).
+
+ If path is non-empty it designates a file or directory containing the
+ data and the parent must contain of the same type (file / directory;
+ not metadata-only), if a parent exists at all. If path is empty and
+ the parent contained data then a metadata-only update is performed,
+ reusing the data of the parent. If both parent_id and path are empty
+ a metadata-only entry is created.
+
+ If delete_after is True the contents of path are deleted after they
+ have been committed, but before sending the Saved() signal.
+
+ metadata may contain a version_id field that will be used for the new
+ entry. In that case tree_id must be non-empty and there must not be an
+ existing entry in the data store with the same tree_id and version_id.
+ This functionality is intended for restoring from backups and importing
+ entries from other data stores; most API clients should leave it up to
+ the data store to choose a unique version_id.
+ """
+ self._save(tree_id, parent_id, metadata, path, delete_after, async_cb,
+ async_err_cb)
- self.Created(uid)
- self._optimizer.optimize(uid)
- logger.debug('created %s', uid)
- async_cb(uid)
+ @trace(logger=logger)
+ def _save(self, tree_id, parent_id, metadata, path, delete_after, async_cb,
+ async_err_cb):
+ if path:
+ if not os.access(path, os.R_OK):
+ raise ValueError('Invalid path given.')
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}sb',
- out_signature='s',
- async_callbacks=('async_cb', 'async_err_cb'),
- byte_arrays=True)
- def create(self, props, file_path, transfer_ownership,
- async_cb, async_err_cb):
- uid = str(uuid.uuid4())
- logging.debug('datastore.create %r', uid)
+ if delete_after and not os.access(os.path.dirname(path), os.W_OK):
+ raise ValueError('Deletion requested for read-only directory')
+
+ if (not tree_id) and parent_id:
+ raise ValueError('tree_id is empty but parent_id is not')
+
+ if tree_id and not parent_id:
+ if self._find({'tree_id': tree_id}, {'limit': 1})[1]:
+ raise ValueError('No parent_id given but tree_id already '
+ 'exists')
+
+ elif parent_id:
+ if not self._find({'tree_id': tree_id, 'version_id': parent_id},
+ {'limit': 1})[1]:
+ raise ValueError('Given parent_id does not exist')
+
+ if not tree_id:
+ tree_id = self._gen_uuid()
+
+ child_id = metadata.get('version_id')
+ if not child_id:
+ child_id = self._gen_uuid()
+ elif not tree_id:
+ raise ValueError('No tree_id given but metadata contains version_id')
+ elif self._find({'tree_id': tree_id, 'version_id': child_id},
+ {'limit': 1})[1]:
+
+ raise ValueError('There is an existing entry with the same tree_id'
+ ' and version_id')
- if not props.get('timestamp', ''):
- props['timestamp'] = int(time.time())
+ if not tree_id:
+ tree_id = self._gen_uuid()
+
+ metadata.setdefault('timestamp', time.time())
+ metadata['parent_id'] = parent_id
# FIXME: Support for the deprecated ctime property. Remove in 0.92.
- if 'ctime' in props:
+ if 'ctime' in metadata:
try:
- props['creation_time'] = time.mktime(time.strptime(
- migration.DATE_FORMAT, props['ctime']))
+ metadata['creation_time'] = time.mktime(time.strptime(
+ migration.DATE_FORMAT, metadata['ctime']))
except (TypeError, ValueError):
pass
- if 'creation_time' not in props:
- props['creation_time'] = props['timestamp']
-
- if os.path.exists(file_path):
- stat = os.stat(file_path)
- props['filesize'] = stat.st_size
- else:
- props['filesize'] = 0
+ if 'creation_time' not in metadata:
+ metadata['creation_time'] = metadata['timestamp']
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership,
- lambda * args: self._create_completion_cb(async_cb,
- async_err_cb,
- uid,
- * args))
+ # FIXME: how to handle creation_time vs. versions?
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Created(self, uid):
- pass
+ if os.path.exists(path):
+ stat = os.stat(path)
+ metadata['filesize'] = stat.st_size
+ else:
+ metadata['filesize'] = 0
+
+ object_id = (tree_id, child_id)
+ self._metadata_store.store(object_id, metadata)
+ self._index_store.store(object_id, metadata)
+
+ if (not path) and self._file_store.has_data((tree_id, parent_id)):
+ # metadata-only update, reuse parent data
+ path = self._file_store.retrieve((tree_id, parent_id),
+ os.getuid(), 'update')
+ delete_after = True
+
+ if path:
+ self._file_store.store(object_id, path, delete_after,
+ lambda *args, **kwargs: self._save_completion_cb(
+ tree_id, parent_id, child_id, metadata,
+ async_cb, async_err_cb, **kwargs))
+ else:
+ self._save_completion_cb(tree_id, parent_id, child_id, metadata,
+ async_cb, async_err_cb)
- def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
- logger.debug('_update_completion_cb() called with %r / %r, exc %r',
- async_cb, async_err_cb, exc)
+ @trace(logger=logger)
+ def _save_completion_cb(self, tree_id, parent_id, child_id, metadata,
+ async_cb, async_err_cb, exc=None):
+ object_id = (tree_id, child_id)
if exc is not None:
async_err_cb(exc)
return
- self.Updated(uid)
- self._optimizer.optimize(uid)
- logger.debug('updated %s', uid)
- async_cb()
+ self._check_max_versions(tree_id)
+ self.Saved(tree_id, parent_id, child_id, metadata)
+ self._optimizer.optimize(object_id)
+ async_cb(tree_id, child_id)
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}sb',
- out_signature='',
- async_callbacks=('async_cb', 'async_err_cb'),
- byte_arrays=True)
- def update(self, uid, props, file_path, transfer_ownership,
- async_cb, async_err_cb):
- logging.debug('datastore.update %r', uid)
+ def _check_max_versions(self, tree_id):
+ if not self._max_versions:
+ return
- if not props.get('timestamp', ''):
- props['timestamp'] = int(time.time())
+ old_versions = self._find({'tree_id': tree_id},
+ {'all_versions': True, 'offset': self._max_versions,
+ 'metadata': ['tree_id', 'version_id', 'timestamp'],
+ 'order_by': ['+timestamp']})[0]
+ logging.info('Deleting old versions: %r', old_versions)
+ for entry in old_versions:
+ self._delete(entry['tree_id'], entry['version_id'])
- # FIXME: Support for the deprecated ctime property. Remove in 0.92.
- if 'ctime' in props:
- try:
- props['creation_time'] = time.mktime(time.strptime(
- migration.DATE_FORMAT, props['ctime']))
- except (TypeError, ValueError):
- pass
+ @dbus.service.signal(DS_DBUS_INTERFACE2, signature='sssa{sv}')
+ def Saved(self, tree_id, parent_id, child_id, metadata):
+ """Entry created or updated.
- if 'creation_time' not in props:
- props['creation_time'] = props['timestamp']
-
- if file_path:
- # Empty file_path means skipping storage stage, see filestore.py
- # TODO would be more useful to update filesize after real file save
- if os.path.exists(file_path):
- stat = os.stat(file_path)
- props['filesize'] = stat.st_size
- else:
- props['filesize'] = 0
-
- self._metadata_store.store(uid, props)
- self._index_store.store(uid, props)
-
- if os.path.exists(self._file_store.get_file_path(uid)) and \
- (not file_path or os.path.exists(file_path)):
- self._optimizer.remove(uid)
- self._file_store.store(uid, file_path, transfer_ownership,
- lambda * args: self._update_completion_cb(async_cb,
- async_err_cb,
- uid,
- * args))
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Updated(self, uid):
+ Entry identified by object_id has been saved to permanent storage.
+ """
pass
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='a{sv}as',
- out_signature='aa{sv}u')
- def find(self, query, properties):
- logging.debug('datastore.find %r', query)
- t = time.time()
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='ssa{sv}',
+ out_signature='',
+ byte_arrays=True)
+ def change_metadata(self, tree_id, version_id, metadata):
+ """Change metadata of existing entry.
+
+ Changes the metadata of the object identified by object_id to match
+ metadata. Fully synchronous, no return value.
+ Emits signal ChangedMetadata(tree_id, version_id, metadata).
+ """
+ self._change_metadata(tree_id, version_id, metadata)
+
+ @trace(logger=logger)
+ def _change_metadata(self, tree_id, version_id, metadata):
+ object_id = (tree_id, version_id)
+ metadata.setdefault('timestamp', time.time())
+ self._metadata_store.store(object_id, metadata)
+ self._index_store.store(object_id, metadata)
+ self.ChangedMetadata(tree_id, version_id, metadata)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssa{sv}')
+ def ChangedMetadata(self, tree_id, version_id, metadata):
+ """Metadata of entry changed."""
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='a{sv}a{sv}',
+ out_signature='aa{sv}u',
+ byte_arrays=True)
+ def find(self, query, options):
+ return self._find(query, options)
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='sa{sv}a{sv}',
+ out_signature='aa{sv}u',
+ byte_arrays=True)
+ def textsearch(self, querystring, query, options):
+ return self._find(query, options, querystring)
+
+ @trace(logger=logger)
+ def _find(self, query, options, querystring=None):
if not self._index_updating:
try:
- uids, count = self._index_store.find(query)
+ object_ids, count = self._index_store.find(query, querystring,
+ options)
except Exception:
logging.exception('Failed to query index, will rebuild')
self._rebuild_index()
if self._index_updating:
logging.warning('Index updating, returning all entries')
- return self._find_all(query, properties)
+ return self._find_all(query, options)
entries = []
- for uid in uids:
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ for object_id in object_ids:
+ entry_path = layoutmanager.get_instance().get_entry_path(object_id)
if not os.path.exists(entry_path):
logging.warning(
'Inconsistency detected, returning all entries')
self._rebuild_index()
- return self._find_all(query, properties)
+ return self._find_all(query, options)
- metadata = self._metadata_store.retrieve(uid, properties)
+ metadata = self._metadata_store.retrieve(object_id,
+ options.get('metadata'))
entries.append(metadata)
- logger.debug('find(): %r', time.time() - t)
-
return entries, count
- def _find_all(self, query, properties):
- uids = layoutmanager.get_instance().find_all()
- count = len(uids)
+ def _find_all(self, query, options):
+ object_ids = layoutmanager.get_instance().find_all()
+
+ if not options.get('all_versions', False):
+ tids_vtime = {}
+ for tree_id, version_id in object_ids:
+ tids_vtime.setdefault(tree_id, []).append((version_id,
+ self._metadata_store.retrieve((tree_id, version_id),
+ 'timestamp')))
+
+ object_ids = [(tree_id, sorted(candidates,
+ key=lambda candidate_id: candidate_id[1], reverse=True)[0][0])
+ for (tree_id, candidates) in tids_vtime.items()]
+
+ count = len(object_ids)
- offset = query.get('offset', 0)
- limit = query.get('limit', MAX_QUERY_LIMIT)
- uids = uids[offset:offset + limit]
+ offset = options.get('offset', 0)
+ limit = options.get('limit', MAX_QUERY_LIMIT)
+ object_ids = object_ids[offset:offset + limit]
entries = []
- for uid in uids:
- metadata = self._metadata_store.retrieve(uid, properties)
+ for object_id in object_ids:
+ metadata = self._metadata_store.retrieve(object_id,
+ options.get('metadata'))
entries.append(metadata)
return entries, count
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='ss',
out_signature='s',
- sender_keyword='sender')
- def get_filename(self, uid, sender=None):
- logging.debug('datastore.get_filename %r', uid)
+ sender_keyword='sender',
+ byte_arrays=True)
+ @trace(logger=logger)
+ def get_data(self, tree_id, version_id, sender=None):
+ """Make given entry readable and return path.
+
+ Hardlinks the given object into a global directory, making it readable
+ to the caller. Fully synchronous.
+ Signal GotData(sender, object_id, path) is emitted when data
+ is available. Sender is the bus name of the sender of the get_data()
+ request. GotData() may be sent before get_data() returns.
+ """
+ return self._get_data(tree_id, version_id, sender)
+
+ def _get_data(self, tree_id, version_id, sender=None):
+ object_id = (tree_id, version_id)
user_id = dbus.Bus().get_unix_user(sender)
- extension = self._get_extension(uid)
- return self._file_store.retrieve(uid, user_id, extension)
+ extension = self._get_extension(object_id)
+ path = self._file_store.retrieve(object_id, user_id, extension)
+ self.GotData(sender, tree_id, version_id, path)
+ return path
+
+ @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssss')
+ def GotData(self, sender, tree_id, version_id, path):
+ """Data is available.
+
+ Data of entry identified by (tree_id, version_id) as requested by
+ sender is available at path now. Permissions are matching the sender,
+ so path may not be accessible to all listeners.
+ """
+ pass
- def _get_extension(self, uid):
- mime_type = self._metadata_store.get_property(uid, 'mime_type')
- if mime_type is None or not mime_type:
+ def _get_extension(self, object_id):
+ mime_type = self._metadata_store.get_property(object_id, 'mime_type')
+ if not mime_type:
return ''
return mime.get_primary_extension(mime_type)
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='a{sv}')
- def get_properties(self, uid):
- logging.debug('datastore.get_properties %r', uid)
- metadata = self._metadata_store.retrieve(uid)
- return metadata
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='sa{sv}',
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='a{sv}s',
out_signature='as')
- def get_uniquevaluesfor(self, propertyname, query=None):
- if propertyname != 'activity':
- raise ValueError('Only ''activity'' is a supported property name')
+ def find_unique_values(self, query, name):
+ """Collect unique values of property.
+
+ Similar to find(), but returns the set of unique values for the
+ requested property.
+ """
+ return self._find_unique_values(query, name)
+
+ def _find_unique_values(self, query, name):
+ if name != 'activity':
+ raise ValueError('Only "activity" is a supported property name')
if query:
raise ValueError('The query parameter is not supported')
if not self._index_updating:
@@ -371,57 +688,48 @@ class DataStore(dbus.service.Object):
logging.warning('Index updating, returning an empty list')
return []
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature='s',
- out_signature='')
- def delete(self, uid):
- self._optimizer.remove(uid)
+ @dbus.service.method(DS_DBUS_INTERFACE2,
+ in_signature='ss',
+ out_signature='',
+ byte_arrays=True)
+ def delete(self, tree_id, version_id):
+ """Delete entry.
+
+ Remove given object from data store. Fully synchronous. Doesn't
+ return anything, emits signal Deleted(tree_id, version_id).
+ """
+ return self._delete(tree_id, version_id)
- self._index_store.delete(uid)
- self._file_store.delete(uid)
- self._metadata_store.delete(uid)
+ def _delete(self, tree_id, version_id):
+ object_id = (tree_id, version_id)
+ self._optimizer.remove(object_id)
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ self._index_store.delete(object_id)
+ self._file_store.delete(object_id)
+ self._metadata_store.delete(object_id)
+
+ entry_path = layoutmanager.get_instance().get_entry_path(object_id)
os.removedirs(entry_path)
- self.Deleted(uid)
- logger.debug('deleted %s', uid)
+ self.Deleted(*object_id)
+ logger.debug('deleted %r', object_id)
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
- def Deleted(self, uid):
+ @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ss')
+ def Deleted(self, tree_id, version_id):
+ """Entry has been deleted."""
pass
def stop(self):
"""shutdown the service"""
+ self._stop()
+
+ def _stop(self):
self._index_store.close_index()
self.Stopped()
- @dbus.service.signal(DS_DBUS_INTERFACE)
+ @dbus.service.signal(DS_DBUS_INTERFACE2)
def Stopped(self):
pass
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="sa{sv}",
- out_signature='s')
- def mount(self, uri, options=None):
- return ''
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="",
- out_signature="aa{sv}")
- def mounts(self):
- return [{'id': 1}]
-
- @dbus.service.method(DS_DBUS_INTERFACE,
- in_signature="s",
- out_signature="")
- def unmount(self, mountpoint_id):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Mounted(self, descriptior):
- pass
-
- @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
- def Unmounted(self, descriptor):
- pass
+ def _gen_uuid(self):
+ return str(uuid.uuid4())
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
index 592c41a..ef6aba8 100644
--- a/src/carquinyol/filestore.py
+++ b/src/carquinyol/filestore.py
@@ -32,15 +32,16 @@ class FileStore(object):
# TODO: add protection against store and retrieve operations on entries
# that are being processed async.
- def store(self, uid, file_path, transfer_ownership, completion_cb):
+ def store(self, object_id, file_path, transfer_ownership, completion_cb):
"""Store a file for a given entry.
"""
- dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ layout_manager = layoutmanager.get_instance()
+ dir_path = layout_manager.get_entry_path(object_id)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
- destination_path = layoutmanager.get_instance().get_data_path(uid)
+ destination_path = layout_manager.get_data_path(object_id)
if file_path:
if not os.path.isfile(file_path):
raise ValueError('No file at %r' % file_path)
@@ -89,15 +90,19 @@ class FileStore(object):
unlink_src)
async_copy.start()
- def retrieve(self, uid, user_id, extension):
+ def has_data(self, object_id):
+ file_path = layoutmanager.get_instance().get_data_path(object_id)
+ return os.path.exists(file_path)
+
+ def retrieve(self, object_id, user_id, extension):
"""Place the file associated to a given entry into a directory
where the user can read it. The caller is reponsible for
deleting this file.
"""
- file_path = layoutmanager.get_instance().get_data_path(uid)
+ file_path = layoutmanager.get_instance().get_data_path(object_id)
if not os.path.exists(file_path):
- logging.debug('Entry %r doesnt have any file', uid)
+ logging.debug('Entry %r doesnt have any file', object_id)
return ''
if extension is None:
@@ -125,7 +130,7 @@ class FileStore(object):
if use_instance_dir:
os.umask(old_umask)
- fd, destination_path = tempfile.mkstemp(prefix=uid + '_',
+ fd, destination_path = tempfile.mkstemp(prefix='%s-%s_' % object_id,
suffix=extension, dir=destination_dir)
os.close(fd)
os.unlink(destination_path)
@@ -143,21 +148,21 @@ class FileStore(object):
return destination_path
- def get_file_path(self, uid):
- return layoutmanager.get_instance().get_data_path(uid)
+ def get_file_path(self, object_id):
+ return layoutmanager.get_instance().get_data_path(object_id)
- def delete(self, uid):
+ def delete(self, object_id):
"""Remove the file associated to a given entry.
"""
- file_path = layoutmanager.get_instance().get_data_path(uid)
+ file_path = layoutmanager.get_instance().get_data_path(object_id)
if os.path.exists(file_path):
os.remove(file_path)
- def hard_link_entry(self, new_uid, existing_uid):
- existing_file = layoutmanager.get_instance().get_data_path(
- existing_uid)
- new_file = layoutmanager.get_instance().get_data_path(new_uid)
+ def hard_link_entry(self, new_object_id, existing_object_id):
+ layout_manager = layoutmanager.get_instance()
+ existing_file = layout_manager.get_data_path(existing_object_id)
+ new_file = layout_manager.get_data_path(new_object_id)
logging.debug('removing %r', new_file)
os.remove(new_file)
@@ -183,8 +188,10 @@ class AsyncCopy(object):
self.size = 0
def _cleanup(self):
- os.close(self.src_fp)
- os.close(self.dest_fp)
+ if self.src_fp != -1:
+ os.close(self.src_fp)
+ if self.dest_fp != -1:
+ os.close(self.dest_fp)
os.chmod(self.dest, 0400)
def _copy_block(self, user_data=None):
@@ -208,9 +215,10 @@ class AsyncCopy(object):
self._complete(None)
return False
except Exception, err:
- logging.error('AC: Error copying %s -> %s: %r', self.src, self.
- dest, err)
- self._complete(err)
+ logging.exception('AC: Error copying %s -> %s', self.src,
+ self.dest)
+ self._cleanup()
+ self.completion(err)
return False
return True
diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py
index 80a1ade..c9cd052 100644
--- a/src/carquinyol/indexstore.py
+++ b/src/carquinyol/indexstore.py
@@ -25,16 +25,18 @@ from xapian import WritableDatabase, Document, Enquire, Query
from carquinyol import layoutmanager
from carquinyol.layoutmanager import MAX_QUERY_LIMIT
-_VALUE_UID = 0
+_VALUE_TREE_ID = 0
_VALUE_TIMESTAMP = 1
_VALUE_TITLE = 2
-# 3 reserved for version support
+_VALUE_VERSION_ID = 3
_VALUE_FILESIZE = 4
_VALUE_CREATION_TIME = 5
_PREFIX_NONE = 'N'
_PREFIX_FULL_VALUE = 'F'
-_PREFIX_UID = 'Q'
+_PREFIX_OBJECT_ID = 'O'
+_PREFIX_TREE_ID = 'Q'
+_PREFIX_VERSION_ID = 'V'
_PREFIX_ACTIVITY = 'A'
_PREFIX_ACTIVITY_ID = 'I'
_PREFIX_MIME_TYPE = 'M'
@@ -51,7 +53,8 @@ _PROPERTIES_NOT_TO_INDEX = ['timestamp', 'preview']
_MAX_RESULTS = int(2 ** 31 - 1)
_QUERY_TERM_MAP = {
- 'uid': _PREFIX_UID,
+ 'tree_id': _PREFIX_TREE_ID,
+ 'version_id': _PREFIX_VERSION_ID,
'activity': _PREFIX_ACTIVITY,
'activity_id': _PREFIX_ACTIVITY_ID,
'mime_type': _PREFIX_MIME_TYPE,
@@ -257,34 +260,36 @@ class IndexStore(object):
for f in os.listdir(index_path):
os.remove(os.path.join(index_path, f))
- def contains(self, uid):
- postings = self._database.postlist(_PREFIX_FULL_VALUE + \
- _PREFIX_UID + uid)
+ def contains(self, object_id):
+ postings = self._database.postlist(self._object_id_term(object_id))
try:
__ = postings.next()
except StopIteration:
return False
return True
- def store(self, uid, properties):
+ def store(self, object_id, properties):
+ tree_id, version_id = object_id
+ id_term = self._object_id_term(object_id)
document = Document()
- document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_TREE_ID, tree_id)
+ document.add_value(_VALUE_VERSION_ID, version_id)
+ document.add_term(id_term)
term_generator = TermGenerator()
term_generator.index_document(document, properties)
- if not self.contains(uid):
- self._database.add_document(document)
+ if self.contains(object_id):
+ self._database.replace_document(id_term, document)
else:
- self._database.replace_document(_PREFIX_FULL_VALUE + \
- _PREFIX_UID + uid, document)
+ self._database.add_document(document)
self._flush()
- def find(self, query):
- offset = query.pop('offset', 0)
- limit = query.pop('limit', MAX_QUERY_LIMIT)
- order_by = query.pop('order_by', [])
- query_string = query.pop('query', None)
+ def find(self, query, query_string, options):
+ offset = options.pop('offset', 0)
+ limit = options.pop('limit', MAX_QUERY_LIMIT)
+ order_by = options.pop('order_by', [])
+ all_versions = options.pop('all_versions', False)
query_parser = QueryParser()
query_parser.set_database(self._database)
@@ -300,38 +305,101 @@ class IndexStore(object):
order_by = order_by[0]
if order_by == '+timestamp':
- enquire.set_sort_by_value(_VALUE_TIMESTAMP, True)
- enquire.set_docid_order(False)
+ order_by_value = _VALUE_TIMESTAMP
+ order_by_direction = True
elif order_by == '-timestamp':
- enquire.set_sort_by_value(_VALUE_TIMESTAMP, False)
- enquire.set_docid_order(True)
+ order_by_value = _VALUE_TIMESTAMP
+ order_by_direction = False
elif order_by == '+title':
- enquire.set_sort_by_value(_VALUE_TITLE, True)
+ order_by_value = _VALUE_TITLE
+ order_by_direction = True
elif order_by == '-title':
- enquire.set_sort_by_value(_VALUE_TITLE, False)
+ order_by_value = _VALUE_TITLE
+ order_by_direction = False
elif order_by == '+filesize':
- enquire.set_sort_by_value(_VALUE_FILESIZE, True)
+ order_by_value = _VALUE_FILESIZE
+ order_by_direction = True
elif order_by == '-filesize':
- enquire.set_sort_by_value(_VALUE_FILESIZE, False)
+ order_by_value = _VALUE_FILESIZE
+ order_by_direction = False
elif order_by == '+creation_time':
- enquire.set_sort_by_value(_VALUE_CREATION_TIME, True)
+ order_by_value = _VALUE_CREATION_TIME
+ order_by_direction = True
elif order_by == '-creation_time':
- enquire.set_sort_by_value(_VALUE_CREATION_TIME, False)
+ order_by_value = _VALUE_CREATION_TIME
+ order_by_direction = False
else:
+ order_by_value = _VALUE_TIMESTAMP
+ order_by_direction = True
logging.warning('Unsupported property for sorting: %s', order_by)
order_by = '+timestamp'
- query_result = enquire.get_mset(offset, limit, check_at_least)
- total_count = query_result.get_matches_estimated()
+ logging.debug('order_by=%r, order_by_value=%r, order_by_direction=%r',
+ order_by, order_by_value, order_by_direction)
+ enquire.set_sort_by_value(order_by_value, reverse=order_by_direction)
+ enquire.set_docid_order({True: enquire.DESCENDING,
+ False: enquire.ASCENDING}[order_by_direction])
+
+ if not all_versions:
+ enquire.set_collapse_key(_VALUE_TREE_ID)
+
+ if all_versions or (order_by == '+timestamp'):
+ logging.debug('using Xapian for sorting')
+# query_result = enquire.get_mset(offset, limit, check_at_least)
+ # FIXME: work around Xapian returning incorrect match counts
+ query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT)
+ else:
+ # Xapian doesn't support using a different sort order while
+ # collapsing (which needs to be timestamp in our case), so
+ # we need to query everything and sort+limit ourselves.
+ logging.debug('using Xapian for collapsing only')
+ enquire.set_sort_by_value(_VALUE_TIMESTAMP, True)
+ enquire.set_docid_order(enquire.ASCENDING)
+ query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT)
+
+ total_count = query_result.get_matches_lower_bound()
+ documents = [hit.document for hit in query_result]
+
+ if (not all_versions) and (order_by != '+timestamp'):
+ logging.debug('sorting in Python')
+ def _cmp(a, b):
+ value_a = a.get_value(order_by_value)
+ value_b = b.get_value(order_by_value)
+ if value_a < value_b:
+ return -1
+ elif value_a > value_b:
+ return 1
+ elif a.get_docid() < b.get_docid():
+ return -1
+ elif a.get_docid() > b.get_docid():
+ return 1
+ return 0
+
+ documents.sort(cmp=_cmp, reverse=order_by_direction)
+ documents = documents[offset:offset+limit]
+ else:
+ # FIXME: work around Xapian returning incorrect match counts
+ logging.debug('doing offset/limit in Python (%r results, offset %r, limit %r)',
+ len(documents), offset, limit)
+ documents = documents[offset:offset+limit]
+
+ object_ids = []
+ for document in documents:
+ object_ids.append((document.get_value(_VALUE_TREE_ID),
+ document.get_value(_VALUE_VERSION_ID)))
- uids = []
- for hit in query_result:
- uids.append(hit.document.get_value(_VALUE_UID))
+ return (object_ids, total_count)
- return (uids, total_count)
+ def delete(self, object_id):
+ object_id_term = self._object_id_term(object_id)
- def delete(self, uid):
- self._database.delete_document(_PREFIX_FULL_VALUE + _PREFIX_UID + uid)
+ enquire = Enquire(self._database)
+ enquire.set_query(Query(object_id_term))
+ query_results = enquire.get_mset(0, 2, 2)
+ documents = [hit.document for hit in query_results]
+ assert len(documents) == 1
+
+ self._database.delete_document(object_id_term)
self._flush()
def get_activities(self):
@@ -341,6 +409,9 @@ class IndexStore(object):
activities.append(term.term[len(prefix):])
return activities
+ def _object_id_term(self, object_id):
+ return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id
+
def flush(self):
self._flush(True)
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
index 3179a98..afb9e42 100644
--- a/src/carquinyol/layoutmanager.py
+++ b/src/carquinyol/layoutmanager.py
@@ -58,15 +58,21 @@ class LayoutManager(object):
version_path = os.path.join(self._root_path, 'version')
open(version_path, 'w').write(str(version))
- def get_entry_path(self, uid):
+ def get_entry_path(self, object_id):
# os.path.join() is just too slow
- return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+ tree_id, version_id = object_id
+ return '%s/%s/%s/%s' % (self._root_path, tree_id[:2], tree_id,
+ version_id)
- def get_data_path(self, uid):
- return '%s/%s/%s/data' % (self._root_path, uid[:2], uid)
+ def get_data_path(self, object_id):
+ tree_id, version_id = object_id
+ return '%s/%s/%s/%s/data' % (self._root_path, tree_id[:2], tree_id,
+ version_id)
- def get_metadata_path(self, uid):
- return '%s/%s/%s/metadata' % (self._root_path, uid[:2], uid)
+ def get_metadata_path(self, object_id):
+ tree_id, version_id = object_id
+ return '%s/%s/%s/%s/metadata' % (self._root_path, tree_id[:2], tree_id,
+ version_id)
def get_root_path(self):
return self._root_path
@@ -81,13 +87,24 @@ class LayoutManager(object):
return os.path.join(self.get_checksums_dir(), 'queue')
def find_all(self):
- uids = []
- for f in os.listdir(self._root_path):
- if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
- for g in os.listdir(os.path.join(self._root_path, f)):
- if len(g) == 36:
- uids.append(g)
- return uids
+ object_ids = []
+ for tree_2 in os.listdir(self._root_path):
+ tree_2_path = os.path.join(self._root_path, tree_2)
+ if not (os.path.isdir(tree_2_path) and len(tree_2) == 2):
+ continue
+
+ for tree_id in os.listdir(tree_2_path):
+ if len(tree_id) != 36:
+ continue
+
+ tree_path = os.path.join(tree_2_path, tree_id)
+ for version_id in os.listdir(tree_path):
+ if len(version_id) != 36:
+ continue
+
+ object_ids.append((tree_id, version_id))
+
+ return object_ids
def is_empty(self):
"""Check if there is any existing entry.
@@ -98,13 +115,21 @@ class LayoutManager(object):
# unmigrated 0.82 data store
return False
- for f in os.listdir(self._root_path):
- if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
- for g in os.listdir(os.path.join(self._root_path, f)):
- if len(g) == 36:
- return False
+ for tree_2 in os.listdir(self._root_path):
+ tree_2_path = os.path.join(self._root_path, tree_2)
+ if not (os.path.isdir(tree_2_path) and len(tree_2) == 2):
+ continue
+
+ for tree_id in os.listdir(tree_2_path):
+ if len(tree_id) != 36:
+ continue
+
+ tree_path = os.path.join(tree_2_path, tree_id)
+ return False
+
return True
+
_instance = None
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
index 5967017..c8ba56d 100644
--- a/src/carquinyol/metadatastore.py
+++ b/src/carquinyol/metadatastore.py
@@ -9,8 +9,9 @@ _INTERNAL_KEYS = ['checksum']
class MetadataStore(object):
- def store(self, uid, metadata):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
+ def store(self, object_id, metadata):
+ layout_manager = layoutmanager.get_instance()
+ metadata_path = layout_manager.get_metadata_path(object_id)
if not os.path.exists(metadata_path):
os.makedirs(metadata_path)
else:
@@ -18,14 +19,10 @@ class MetadataStore(object):
if key not in _INTERNAL_KEYS:
os.remove(os.path.join(metadata_path, key))
- metadata['uid'] = uid
+ tree_id, version_id = object_id
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
for key, value in metadata.items():
-
- # Hack to support activities that still pass properties named as
- # for example title:text.
- if ':' in key:
- key = key.split(':', 1)[0]
-
f = open(os.path.join(metadata_path, key), 'w')
try:
if isinstance(value, unicode):
@@ -36,25 +33,31 @@ class MetadataStore(object):
finally:
f.close()
- def retrieve(self, uid, properties=None):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
+ def retrieve(self, object_id, properties=None):
+ layout_manager = layoutmanager.get_instance()
+ metadata_path = layout_manager.get_metadata_path(object_id)
+ if properties:
+ return metadatareader.retrieve(metadata_path, list(properties))
return metadatareader.retrieve(metadata_path, properties)
- def delete(self, uid):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
+ def delete(self, object_id):
+ layout_manager = layoutmanager.get_instance()
+ metadata_path = layout_manager.get_metadata_path(object_id)
for key in os.listdir(metadata_path):
os.remove(os.path.join(metadata_path, key))
os.rmdir(metadata_path)
- def get_property(self, uid, key):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
+ def get_property(self, object_id, key):
+ layout_manager = layoutmanager.get_instance()
+ metadata_path = layout_manager.get_metadata_path(object_id)
property_path = os.path.join(metadata_path, key)
if os.path.exists(property_path):
return open(property_path, 'r').read()
else:
return None
- def set_property(self, uid, key, value):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
+ def set_property(self, object_id, key, value):
+ layout_manager = layoutmanager.get_instance()
+ metadata_path = layout_manager.get_metadata_path(object_id)
property_path = os.path.join(metadata_path, key)
open(property_path, 'w').write(value)
diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py
index 1745f2c..d36fb09 100644
--- a/src/carquinyol/migration.py
+++ b/src/carquinyol/migration.py
@@ -23,10 +23,12 @@ import shutil
import time
import cjson
+import uuid
-from carquinyol import layoutmanager
+from carquinyol import layoutmanager, metadatastore
DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
+mstore = metadatastore.MetadataStore()
def migrate_from_0():
@@ -38,22 +40,24 @@ def migrate_from_0():
return
for f in os.listdir(old_root_path):
- uid, ext = os.path.splitext(f)
+ tree_id, ext = os.path.splitext(f)
if ext != '.metadata':
continue
- logging.debug('Migrating entry %r', uid)
-
- new_entry_dir = layoutmanager.get_instance().get_metadata_path(uid)
+ logging.debug('Migrating entry %r', tree_id)
+ version_id = _gen_uuid()
+ object_id = (tree_id, version_id)
+ new_entry_dir = layoutmanager.get_instance().get_metadata_path(
+ object_id)
if not os.path.exists(new_entry_dir):
os.makedirs(new_entry_dir)
try:
- _migrate_metadata(root_path, old_root_path, uid)
- _migrate_file(root_path, old_root_path, uid)
- _migrate_preview(root_path, old_root_path, uid)
+ _migrate_metadata_0(root_path, old_root_path, object_id)
+ _migrate_file_0(root_path, old_root_path, object_id)
+ _migrate_preview_0(root_path, old_root_path, object_id)
except Exception:
- logging.exception('Error while migrating entry %r', uid)
+ logging.exception('Error while migrating entry %r', object_id)
# Just be paranoid, it's cheap.
if old_root_path.endswith('datastore/store'):
@@ -62,13 +66,16 @@ def migrate_from_0():
logging.info('Migration finished')
-def _migrate_metadata(root_path, old_root_path, uid):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
- old_metadata_path = os.path.join(old_root_path, uid + '.metadata')
+def _migrate_metadata_0(root_path, old_root_path, object_id):
+ tree_id, version_id = object_id
+ dir_path = layoutmanager.get_instance().get_entry_path(object_id)
+ metadata_path = layoutmanager.get_instance().get_metadata_path(object_id)
+ old_metadata_path = os.path.join(old_root_path, tree_id + '.metadata')
metadata = cjson.decode(open(old_metadata_path, 'r').read())
- if 'uid' not in metadata:
- metadata['uid'] = uid
+ metadata.pop('uid', None)
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
if 'timestamp' not in metadata and 'mtime' in metadata:
metadata['timestamp'] = \
@@ -87,17 +94,89 @@ def _migrate_metadata(root_path, old_root_path, uid):
f.close()
except Exception:
logging.exception(
- 'Error while migrating property %s of entry %s', key, uid)
+ 'Error while migrating property %r of entry %r', key,
+ object_id)
-def _migrate_file(root_path, old_root_path, uid):
- if os.path.exists(os.path.join(old_root_path, uid)):
- new_data_path = layoutmanager.get_instance().get_data_path(uid)
- os.rename(os.path.join(old_root_path, uid),
+def _migrate_file_0(root_path, old_root_path, object_id):
+ tree_id, version_id_ = object_id
+ if os.path.exists(os.path.join(old_root_path, tree_id)):
+ new_data_path = layoutmanager.get_instance().get_data_path(object_id)
+ os.rename(os.path.join(old_root_path, tree_id),
new_data_path)
-def _migrate_preview(root_path, old_root_path, uid):
- metadata_path = layoutmanager.get_instance().get_metadata_path(uid)
- os.rename(os.path.join(old_root_path, 'preview', uid),
+def _migrate_preview_0(root_path, old_root_path, object_id):
+ tree_id, version_id_ = object_id
+ dir_path = layoutmanager.get_instance().get_entry_path(object_id)
+ metadata_path = layoutmanager.get_instance().get_metadata_path(object_id)
+ os.rename(os.path.join(old_root_path, 'preview', tree_id),
os.path.join(metadata_path, 'preview'))
+
+
+def migrate_from_1():
+ logging.info('Migrating datastore from version 1 or 2')
+
+ layout_manager = layoutmanager.get_instance()
+ root_path = layout_manager.get_root_path()
+ checksum_path = layout_manager.get_checksums_dir()
+
+ version_ids = {}
+ for tree_2 in os.listdir(root_path):
+ if len(tree_2) != 2:
+ continue
+
+ for tree_id in os.listdir(os.path.join(root_path, tree_2)):
+ if (len(tree_id) != 36):
+ continue
+
+ logging.debug('Migrating entry %r' % tree_id)
+
+ version_id = _gen_uuid()
+ version_ids[tree_id] = version_id
+ try:
+ old_path = os.path.join(root_path, tree_2, tree_id)
+ tmp_path = os.path.join(root_path, tree_2, tree_id+'.tmp')
+ os.rename(old_path, tmp_path)
+ try:
+ new_path = layout_manager.get_entry_path((tree_id, version_id))
+ new_metadata_path = layout_manager.get_metadata_path((tree_id,
+ version_id))
+ os.makedirs(os.path.dirname(new_path))
+ os.rename(tmp_path, new_path)
+ except:
+ os.rename(tmp_path, old_path)
+ raise
+
+ metadata = mstore.retrieve((tree_id, version_id))
+ metadata.pop('uid', None)
+ metadata['tree_id'] = tree_id
+ metadata['version_id'] = version_id
+ metadata['parent_id'] = ''
+ metadata['timestamp'] = metadata.get('timestamp',
+ os.stat(new_path).st_ctime)
+ mstore.store((tree_id, version_id), metadata)
+
+ except Exception:
+ logging.exception('Error while migrating entry %r', tree_id)
+
+ for checksum in os.listdir(checksum_path):
+ entries_path = os.path.join(checksum_path, checksum)
+ for tree_id in os.listdir(entries_path):
+ if len(tree_id) != 36:
+ continue
+
+ try:
+ os.rename(os.path.join(entries_path, tree_id),
+ os.path.join(entries_path, '%s,%s' % (tree_id,
+ version_ids[tree_id])))
+
+ except Exception:
+ logging.exception('Error while migrating checksum entry '
+ '%r / %r', checksum, tree_id)
+
+ logging.info('Migration finished')
+
+
+def _gen_uuid():
+ return str(uuid.uuid4())
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
index c038c2b..48b05a6 100644
--- a/src/carquinyol/optimizer.py
+++ b/src/carquinyol/optimizer.py
@@ -33,36 +33,36 @@ class Optimizer(object):
self._metadata_store = metadata_store
self._enqueue_checksum_id = None
- def optimize(self, uid):
+ def optimize(self, object_id):
"""Add an entry to a queue of entries to be checked for duplicates.
"""
- if not os.path.exists(self._file_store.get_file_path(uid)):
+ if not os.path.exists(self._file_store.get_file_path(object_id)):
return
- queue_path = layoutmanager.get_instance().get_queue_path()
- open(os.path.join(queue_path, uid), 'w').close()
- logging.debug('optimize %r', os.path.join(queue_path, uid))
+ entry_path = self._queue_entry_path(object_id)
+ open(entry_path, 'w').close()
+ logging.debug('queueing %r for optimization', entry_path)
if self._enqueue_checksum_id is None:
self._enqueue_checksum_id = \
gobject.idle_add(self._process_entry_cb,
priority=gobject.PRIORITY_LOW)
- def remove(self, uid):
+ def remove(self, object_id):
"""Remove any structures left from space optimization
"""
- checksum = self._metadata_store.get_property(uid, 'checksum')
+ checksum = self._metadata_store.get_property(object_id, 'checksum')
if checksum is None:
return
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- checksum_entry_path = os.path.join(checksum_path, uid)
+ checksum_entry_path = self._checksum_entry_path(checksum, object_id)
if os.path.exists(checksum_entry_path):
- logging.debug('remove %r', checksum_entry_path)
+ logging.debug('removing %r', checksum_entry_path)
os.remove(checksum_entry_path)
if os.path.exists(checksum_path):
@@ -73,6 +73,14 @@ class Optimizer(object):
if e.errno != errno.ENOTEMPTY:
raise
+ def _queue_entry_path(self, object_id):
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ return os.path.join(queue_path, '%s,%s' % object_id)
+
+ def _checksum_entry_path(self, checksum, object_id):
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ return os.path.join(checksums_dir, checksum, '%s,%s' % object_id)
+
def _identical_file_already_exists(self, checksum):
"""Check if we already have files with this checksum.
@@ -81,14 +89,14 @@ class Optimizer(object):
checksum_path = os.path.join(checksums_dir, checksum)
return os.path.exists(checksum_path)
- def _get_uid_from_checksum(self, checksum):
+ def _get_object_id_from_checksum(self, checksum):
"""Get an existing entry which file matches checksum.
"""
checksums_dir = layoutmanager.get_instance().get_checksums_dir()
checksum_path = os.path.join(checksums_dir, checksum)
- first_uid = os.listdir(checksum_path)[0]
- return first_uid
+ first_object_id = os.listdir(checksum_path)[0].split(',')
+ return tuple(first_object_id)
def _create_checksum_dir(self, checksum):
"""Create directory that tracks files with this same checksum.
@@ -99,24 +107,21 @@ class Optimizer(object):
logging.debug('create dir %r', checksum_path)
os.mkdir(checksum_path)
- def _add_checksum_entry(self, uid, checksum):
- """Create a file in the checksum dir with the uid of the entry
+ def _add_checksum_entry(self, object_id, checksum):
+ """Create a file in the checksum dir with the object_id of the entry
"""
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
-
- logging.debug('touch %r', os.path.join(checksum_path, uid))
- open(os.path.join(checksum_path, uid), 'w').close()
+ checksum_entry_path = self._checksum_entry_path(checksum, object_id)
+ logging.debug('touch %r', checksum_entry_path)
+ open(checksum_entry_path, 'w').close()
- def _already_linked(self, uid, checksum):
+ def _already_linked(self, object_id, checksum):
"""Check if this entry's file is already a hard link to the checksums
dir.
"""
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(os.path.join(checksum_path, uid))
+ checksum_entry_path = self._checksum_entry_path(checksum, object_id)
+ return os.path.exists(checksum_entry_path)
def _process_entry_cb(self):
"""Process one item in the checksums queue by calculating its checksum,
@@ -127,30 +132,31 @@ class Optimizer(object):
queue_path = layoutmanager.get_instance().get_queue_path()
queue = os.listdir(queue_path)
if queue:
- uid = queue[0]
- logging.debug('_process_entry_cb processing %r', uid)
+ object_id = tuple(queue[0].split(','))
+ logging.debug('_process_entry_cb processing %r', object_id)
- file_in_entry_path = self._file_store.get_file_path(uid)
+ file_in_entry_path = self._file_store.get_file_path(object_id)
if not os.path.exists(file_in_entry_path):
- logging.info('non-existent entry in queue: %r', uid)
+ logging.info('non-existent entry in queue: %r', object_id)
else:
- checksum = self._calculate_md5sum(file_in_entry_path)
- self._metadata_store.set_property(uid, 'checksum', checksum)
+ checksum = self.calculate_md5sum(file_in_entry_path)
+ self._metadata_store.set_property(object_id, 'checksum',
+ checksum)
if self._identical_file_already_exists(checksum):
- if not self._already_linked(uid, checksum):
- existing_entry_uid = \
- self._get_uid_from_checksum(checksum)
+ if not self._already_linked(object_id, checksum):
+ existing_entry_object_id = \
+ self._get_object_id_from_checksum(checksum)
- self._file_store.hard_link_entry(uid,
- existing_entry_uid)
+ self._file_store.hard_link_entry(object_id,
+ existing_entry_object_id)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(object_id, checksum)
else:
self._create_checksum_dir(checksum)
- self._add_checksum_entry(uid, checksum)
+ self._add_checksum_entry(object_id, checksum)
- os.remove(os.path.join(queue_path, uid))
+ os.remove(self._queue_entry_path(object_id))
if len(queue) <= 1:
self._enqueue_checksum_id = None
@@ -158,7 +164,7 @@ class Optimizer(object):
else:
return True
- def _calculate_md5sum(self, path):
+ def calculate_md5sum(self, path):
"""Calculate the md5 checksum of a given file.
"""