diff options
Diffstat (limited to 'src/carquinyol')
-rw-r--r-- | src/carquinyol/datastore.py | 698 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 48 | ||||
-rw-r--r-- | src/carquinyol/indexstore.py | 143 | ||||
-rw-r--r-- | src/carquinyol/layoutmanager.py | 61 | ||||
-rw-r--r-- | src/carquinyol/metadatastore.py | 37 | ||||
-rw-r--r-- | src/carquinyol/migration.py | 123 | ||||
-rw-r--r-- | src/carquinyol/optimizer.py | 82 |
7 files changed, 846 insertions, 346 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index 0fc8be1..bc8e208 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -25,9 +25,11 @@ import os import dbus import dbus.service +import gconf import gobject from sugar import mime +from sugar.logger import trace from carquinyol import layoutmanager from carquinyol import migration @@ -41,12 +43,207 @@ from carquinyol.optimizer import Optimizer DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' DS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE1 = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH1 = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE2 = "org.laptop.sugar.DataStore2" +DS_OBJECT_PATH2 = "/org/laptop/sugar/DataStore2" logger = logging.getLogger(DS_LOG_CHANNEL) +class DBusAPIv1(dbus.service.Object): + """Compatibility layer for old (unversioned) D-Bus API + """ + + # pylint: disable-msg=W0212 + + def __init__(self, data_store): + self._data_store = data_store + bus_name = dbus.service.BusName(DS_SERVICE, bus=dbus.SessionBus(), + replace_existing=False, allow_replacement=False) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH1) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='a{sv}sb', out_signature='s', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def create(self, props, file_path, transfer_ownership, + async_cb, async_err_cb): + + def success_cb(tree_id, child_id): + self.Created(tree_id) + async_cb(tree_id) + + self._data_store._save(tree_id='', parent_id='', metadata=props, + path=file_path, delete_after=transfer_ownership, + async_cb=success_cb, async_err_cb=async_err_cb) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Created(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def update(self, uid, props, file_path, transfer_ownership, + async_cb, async_err_cb): + + def success_cb(tree_id, child_id): + self.Updated(tree_id) + async_cb() + + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Trying to update non-existant entry %s - wanted' + ' to use create()?' % (uid, )) + + parent = latest_versions[0] + if self._compare_checksums(parent, file_path): + self._data_store._change_metadata(parent['tree_id'], + parent['version_id'], props) + return success_cb(uid, None) + + self._data_store._save(tree_id=uid, + parent_id=parent['version_id'], metadata=props, + path=file_path, delete_after=transfer_ownership, + async_cb=success_cb, async_err_cb=async_err_cb) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Updated(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='a{sv}as', + out_signature='aa{sv}u') + def find(self, query, properties): + if 'uid' in properties: + properties.append('tree_id') + properties.remove('uid') + + options = {'metadata': properties} + for name in ['offset', 'limit', 'order_by']: + if name in query: + options[name] = query.pop(name) + + if 'uid' in query: + query['tree_id'] = query.pop('uid') + + results, count = self._data_store._find(query, options, + query.get('query')) + + if 'tree_id' in properties: + for entry in results: + entry['uid'] = entry.pop('tree_id') + + return results, count + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='s', + sender_keyword='sender') + def get_filename(self, uid, sender=None): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + return self._data_store._get_data(uid, + latest_versions[0]['version_id'], sender=sender) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='a{sv}') + def get_properties(self, uid): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + latest_versions[0]['uid'] = latest_versions[0].pop('tree_id') + del latest_versions[0]['version_id'] + return latest_versions[0] + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='sa{sv}', + out_signature='as') + def get_uniquevaluesfor(self, propertyname, query=None): + return self._data_store._find_unique_values(query, propertyname) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='') + def delete(self, uid): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + self._data_store._delete(tree_id=uid, + version_id=latest_versions[0]['version_id']) + self.Deleted(uid) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Deleted(self, uid): + pass + + def stop(self): + """shutdown the service""" + self._data_store._stop() + self.Stopped() + + @dbus.service.signal(DS_DBUS_INTERFACE1) + def Stopped(self): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="sa{sv}", + out_signature='s') + def mount(self, uri, options=None): + return '' + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="", + out_signature="aa{sv}") + def mounts(self): + return [{'id': 1}] + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="s", + out_signature="") + def unmount(self, mountpoint_id): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}") + def Mounted(self, descriptior): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}") + def Unmounted(self, descriptor): + pass + + def _get_latest(self, uid): + return self._data_store._find({'tree_id': uid}, + {'limit': 1, 'order_by': ['+timestamp']})[0] + + @trace() + def _compare_checksums(self, parent, child_data): + parent_data = self._data_store._file_store.get_file_path( + (parent['tree_id'], parent['version_id'])) + if bool(child_data) ^ os.path.exists(parent_data): + return False + elif not child_data: + return True + + if 'checksum' in parent: + parent_checksum = parent['checksum'] + else: + parent_checksum = self._data_store._optimizer.calculate_md5sum( + parent_data) + + child_checksum = self._data_store._optimizer.calculate_md5sum( + child_data) + return parent_checksum == child_checksum + + class DataStore(dbus.service.Object): """D-Bus API and logic for connecting all the other components. """ @@ -56,7 +253,12 @@ class DataStore(dbus.service.Object): bus=dbus.SessionBus(), replace_existing=False, allow_replacement=False) - dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH2) + self._api_v1 = DBusAPIv1(self) + gconf_client = gconf.client_get_default() + self._max_versions = gconf_client.get_int( + '/desktop/sugar/datastore/max_versions') + logging.debug('max_versions=%r', self._max_versions) migrated, initiated = self._open_layout() @@ -104,7 +306,7 @@ class DataStore(dbus.service.Object): if old_version == 0: migration.migrate_from_0() - elif old_version in [1, 2, 4] + elif old_version in [1, 2, 4]: migration.migrate_from_1() layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION) @@ -119,26 +321,26 @@ class DataStore(dbus.service.Object): def _update_index(self): """Find entries that are not yet in the index and add them.""" - uids = layoutmanager.get_instance().find_all() + object_ids = layoutmanager.get_instance().find_all() logging.debug('Going to update the index with object_ids %r', - uids) + object_ids) self._index_updating = True - gobject.idle_add(lambda: self.__update_index_cb(uids), + gobject.idle_add(lambda: self.__update_index_cb(object_ids), priority=gobject.PRIORITY_LOW) - def __update_index_cb(self, uids): - if uids: - uid = uids.pop() + def __update_index_cb(self, object_ids): + if object_ids: + object_id = object_ids.pop() - logging.debug('Updating entry %r in index. %d to go.', uid, - len(uids)) + logging.debug('Updating entry %r in index. %d to go.', object_id, + len(object_ids)) - if not self._index_store.contains(uid): + if not self._index_store.contains(object_id): try: update_metadata = False - props = self._metadata_store.retrieve(uid) + props = self._metadata_store.retrieve(object_id) if 'filesize' not in props: - path = self._file_store.get_file_path(uid) + path = self._file_store.get_file_path(object_id) if os.path.exists(path): props['filesize'] = os.stat(path).st_size update_metadata = True @@ -157,12 +359,12 @@ class DataStore(dbus.service.Object): props['creation_time'] = props['timestamp'] update_metadata = True if update_metadata: - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) + self._metadata_store.store(object_id, props) + self._index_store.store(object_id, props) except Exception: - logging.exception('Error processing %r', uid) + logging.exception('Error processing %r', object_id) - if not uids: + if not object_ids: self._index_store.flush() self._index_updating = False logging.debug('Finished updating index.') @@ -170,199 +372,314 @@ class DataStore(dbus.service.Object): else: return True - def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb, - async_err_cb, uid, exc) - if exc is not None: - async_err_cb(exc) - return + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ssa{sv}sb', + out_signature='ss', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb): + """Store new (version of an) object and return (tree_id, child_id). + + Submit new version (data and/or metadata) of the given object, + returning (tree_id, child_id). parent_id and child_id are the + version_id of the predecessor resp. the to-be-saved entry. + + If tree_id is empty parent_id must be empty as well and both will be + newly allocated (equivalent to create() in previous API). If only + parent_id is empty there must not be any entry with the same tree_id + already in datastore. + + File storage happens synchronously, emitting the signal + Saved(tree_id, parent_id, child_id) on completion (i.e. before + returning from save()). + + If path is non-empty it designates a file or directory containing the + data and the parent must contain of the same type (file / directory; + not metadata-only), if a parent exists at all. If path is empty and + the parent contained data then a metadata-only update is performed, + reusing the data of the parent. If both parent_id and path are empty + a metadata-only entry is created. + + If delete_after is True the contents of path are deleted after they + have been committed, but before sending the Saved() signal. + + metadata may contain a version_id field that will be used for the new + entry. In that case tree_id must be non-empty and there must not be an + existing entry in the data store with the same tree_id and version_id. + This functionality is intended for restoring from backups and importing + entries from other data stores; most API clients should leave it up to + the data store to choose a unique version_id. + """ + self._save(tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb) - self.Created(uid) - self._optimizer.optimize(uid) - logger.debug('created %s', uid) - async_cb(uid) + @trace(logger=logger) + def _save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb): + if path: + if not os.access(path, os.R_OK): + raise ValueError('Invalid path given.') - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}sb', - out_signature='s', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def create(self, props, file_path, transfer_ownership, - async_cb, async_err_cb): - uid = str(uuid.uuid4()) - logging.debug('datastore.create %r', uid) + if delete_after and not os.access(os.path.dirname(path), os.W_OK): + raise ValueError('Deletion requested for read-only directory') + + if (not tree_id) and parent_id: + raise ValueError('tree_id is empty but parent_id is not') + + if tree_id and not parent_id: + if self._find({'tree_id': tree_id}, {'limit': 1})[1]: + raise ValueError('No parent_id given but tree_id already ' + 'exists') + + elif parent_id: + if not self._find({'tree_id': tree_id, 'version_id': parent_id}, + {'limit': 1})[1]: + raise ValueError('Given parent_id does not exist') + + if not tree_id: + tree_id = self._gen_uuid() + + child_id = metadata.get('version_id') + if not child_id: + child_id = self._gen_uuid() + elif not tree_id: + raise ValueError('No tree_id given but metadata contains version_id') + elif self._find({'tree_id': tree_id, 'version_id': child_id}, + {'limit': 1})[1]: + + raise ValueError('There is an existing entry with the same tree_id' + ' and version_id') - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) + if not tree_id: + tree_id = self._gen_uuid() + + metadata.setdefault('timestamp', time.time()) + metadata['parent_id'] = parent_id # FIXME: Support for the deprecated ctime property. Remove in 0.92. - if 'ctime' in props: + if 'ctime' in metadata: try: - props['creation_time'] = time.mktime(time.strptime( - migration.DATE_FORMAT, props['ctime'])) + metadata['creation_time'] = time.mktime(time.strptime( + migration.DATE_FORMAT, metadata['ctime'])) except (TypeError, ValueError): pass - if 'creation_time' not in props: - props['creation_time'] = props['timestamp'] - - if os.path.exists(file_path): - stat = os.stat(file_path) - props['filesize'] = stat.st_size - else: - props['filesize'] = 0 + if 'creation_time' not in metadata: + metadata['creation_time'] = metadata['timestamp'] - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, - lambda * args: self._create_completion_cb(async_cb, - async_err_cb, - uid, - * args)) + # FIXME: how to handle creation_time vs. versions? - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Created(self, uid): - pass + if os.path.exists(path): + stat = os.stat(path) + metadata['filesize'] = stat.st_size + else: + metadata['filesize'] = 0 + + object_id = (tree_id, child_id) + self._metadata_store.store(object_id, metadata) + self._index_store.store(object_id, metadata) + + if (not path) and self._file_store.has_data((tree_id, parent_id)): + # metadata-only update, reuse parent data + path = self._file_store.retrieve((tree_id, parent_id), + os.getuid(), 'update') + delete_after = True + + if path: + self._file_store.store(object_id, path, delete_after, + lambda *args, **kwargs: self._save_completion_cb( + tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb, **kwargs)) + else: + self._save_completion_cb(tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb) - def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug('_update_completion_cb() called with %r / %r, exc %r', - async_cb, async_err_cb, exc) + @trace(logger=logger) + def _save_completion_cb(self, tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb, exc=None): + object_id = (tree_id, child_id) if exc is not None: async_err_cb(exc) return - self.Updated(uid) - self._optimizer.optimize(uid) - logger.debug('updated %s', uid) - async_cb() + self._check_max_versions(tree_id) + self.Saved(tree_id, parent_id, child_id, metadata) + self._optimizer.optimize(object_id) + async_cb(tree_id, child_id) - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}sb', - out_signature='', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def update(self, uid, props, file_path, transfer_ownership, - async_cb, async_err_cb): - logging.debug('datastore.update %r', uid) + def _check_max_versions(self, tree_id): + if not self._max_versions: + return - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) + old_versions = self._find({'tree_id': tree_id}, + {'all_versions': True, 'offset': self._max_versions, + 'metadata': ['tree_id', 'version_id', 'timestamp'], + 'order_by': ['+timestamp']})[0] + logging.info('Deleting old versions: %r', old_versions) + for entry in old_versions: + self._delete(entry['tree_id'], entry['version_id']) - # FIXME: Support for the deprecated ctime property. Remove in 0.92. - if 'ctime' in props: - try: - props['creation_time'] = time.mktime(time.strptime( - migration.DATE_FORMAT, props['ctime'])) - except (TypeError, ValueError): - pass + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='sssa{sv}') + def Saved(self, tree_id, parent_id, child_id, metadata): + """Entry created or updated. - if 'creation_time' not in props: - props['creation_time'] = props['timestamp'] - - if file_path: - # Empty file_path means skipping storage stage, see filestore.py - # TODO would be more useful to update filesize after real file save - if os.path.exists(file_path): - stat = os.stat(file_path) - props['filesize'] = stat.st_size - else: - props['filesize'] = 0 - - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - - if os.path.exists(self._file_store.get_file_path(uid)) and \ - (not file_path or os.path.exists(file_path)): - self._optimizer.remove(uid) - self._file_store.store(uid, file_path, transfer_ownership, - lambda * args: self._update_completion_cb(async_cb, - async_err_cb, - uid, - * args)) - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Updated(self, uid): + Entry identified by object_id has been saved to permanent storage. + """ pass - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}as', - out_signature='aa{sv}u') - def find(self, query, properties): - logging.debug('datastore.find %r', query) - t = time.time() + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ssa{sv}', + out_signature='', + byte_arrays=True) + def change_metadata(self, tree_id, version_id, metadata): + """Change metadata of existing entry. + + Changes the metadata of the object identified by object_id to match + metadata. Fully synchronous, no return value. + Emits signal ChangedMetadata(tree_id, version_id, metadata). + """ + self._change_metadata(tree_id, version_id, metadata) + + @trace(logger=logger) + def _change_metadata(self, tree_id, version_id, metadata): + object_id = (tree_id, version_id) + metadata.setdefault('timestamp', time.time()) + self._metadata_store.store(object_id, metadata) + self._index_store.store(object_id, metadata) + self.ChangedMetadata(tree_id, version_id, metadata) + + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssa{sv}') + def ChangedMetadata(self, tree_id, version_id, metadata): + """Metadata of entry changed.""" + pass + + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='a{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + def find(self, query, options): + return self._find(query, options) + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='sa{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + def textsearch(self, querystring, query, options): + return self._find(query, options, querystring) + + @trace(logger=logger) + def _find(self, query, options, querystring=None): if not self._index_updating: try: - uids, count = self._index_store.find(query) + object_ids, count = self._index_store.find(query, querystring, + options) except Exception: logging.exception('Failed to query index, will rebuild') self._rebuild_index() if self._index_updating: logging.warning('Index updating, returning all entries') - return self._find_all(query, properties) + return self._find_all(query, options) entries = [] - for uid in uids: - entry_path = layoutmanager.get_instance().get_entry_path(uid) + for object_id in object_ids: + entry_path = layoutmanager.get_instance().get_entry_path(object_id) if not os.path.exists(entry_path): logging.warning( 'Inconsistency detected, returning all entries') self._rebuild_index() - return self._find_all(query, properties) + return self._find_all(query, options) - metadata = self._metadata_store.retrieve(uid, properties) + metadata = self._metadata_store.retrieve(object_id, + options.get('metadata')) entries.append(metadata) - logger.debug('find(): %r', time.time() - t) - return entries, count - def _find_all(self, query, properties): - uids = layoutmanager.get_instance().find_all() - count = len(uids) + def _find_all(self, query, options): + object_ids = layoutmanager.get_instance().find_all() + + if not options.get('all_versions', False): + tids_vtime = {} + for tree_id, version_id in object_ids: + tids_vtime.setdefault(tree_id, []).append((version_id, + self._metadata_store.retrieve((tree_id, version_id), + 'timestamp'))) + + object_ids = [(tree_id, sorted(candidates, + key=lambda candidate_id: candidate_id[1], reverse=True)[0][0]) + for (tree_id, candidates) in tids_vtime.items()] + + count = len(object_ids) - offset = query.get('offset', 0) - limit = query.get('limit', MAX_QUERY_LIMIT) - uids = uids[offset:offset + limit] + offset = options.get('offset', 0) + limit = options.get('limit', MAX_QUERY_LIMIT) + object_ids = object_ids[offset:offset + limit] entries = [] - for uid in uids: - metadata = self._metadata_store.retrieve(uid, properties) + for object_id in object_ids: + metadata = self._metadata_store.retrieve(object_id, + options.get('metadata')) entries.append(metadata) return entries, count - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ss', out_signature='s', - sender_keyword='sender') - def get_filename(self, uid, sender=None): - logging.debug('datastore.get_filename %r', uid) + sender_keyword='sender', + byte_arrays=True) + @trace(logger=logger) + def get_data(self, tree_id, version_id, sender=None): + """Make given entry readable and return path. + + Hardlinks the given object into a global directory, making it readable + to the caller. Fully synchronous. + Signal GotData(sender, object_id, path) is emitted when data + is available. Sender is the bus name of the sender of the get_data() + request. GotData() may be sent before get_data() returns. + """ + return self._get_data(tree_id, version_id, sender) + + def _get_data(self, tree_id, version_id, sender=None): + object_id = (tree_id, version_id) user_id = dbus.Bus().get_unix_user(sender) - extension = self._get_extension(uid) - return self._file_store.retrieve(uid, user_id, extension) + extension = self._get_extension(object_id) + path = self._file_store.retrieve(object_id, user_id, extension) + self.GotData(sender, tree_id, version_id, path) + return path + + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssss') + def GotData(self, sender, tree_id, version_id, path): + """Data is available. + + Data of entry identified by (tree_id, version_id) as requested by + sender is available at path now. Permissions are matching the sender, + so path may not be accessible to all listeners. + """ + pass - def _get_extension(self, uid): - mime_type = self._metadata_store.get_property(uid, 'mime_type') - if mime_type is None or not mime_type: + def _get_extension(self, object_id): + mime_type = self._metadata_store.get_property(object_id, 'mime_type') + if not mime_type: return '' return mime.get_primary_extension(mime_type) - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='a{sv}') - def get_properties(self, uid): - logging.debug('datastore.get_properties %r', uid) - metadata = self._metadata_store.retrieve(uid) - return metadata - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}', + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='a{sv}s', out_signature='as') - def get_uniquevaluesfor(self, propertyname, query=None): - if propertyname != 'activity': - raise ValueError('Only ''activity'' is a supported property name') + def find_unique_values(self, query, name): + """Collect unique values of property. + + Similar to find(), but returns the set of unique values for the + requested property. + """ + return self._find_unique_values(query, name) + + def _find_unique_values(self, query, name): + if name != 'activity': + raise ValueError('Only "activity" is a supported property name') if query: raise ValueError('The query parameter is not supported') if not self._index_updating: @@ -371,57 +688,48 @@ class DataStore(dbus.service.Object): logging.warning('Index updating, returning an empty list') return [] - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='') - def delete(self, uid): - self._optimizer.remove(uid) + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ss', + out_signature='', + byte_arrays=True) + def delete(self, tree_id, version_id): + """Delete entry. + + Remove given object from data store. Fully synchronous. Doesn't + return anything, emits signal Deleted(tree_id, version_id). + """ + return self._delete(tree_id, version_id) - self._index_store.delete(uid) - self._file_store.delete(uid) - self._metadata_store.delete(uid) + def _delete(self, tree_id, version_id): + object_id = (tree_id, version_id) + self._optimizer.remove(object_id) - entry_path = layoutmanager.get_instance().get_entry_path(uid) + self._index_store.delete(object_id) + self._file_store.delete(object_id) + self._metadata_store.delete(object_id) + + entry_path = layoutmanager.get_instance().get_entry_path(object_id) os.removedirs(entry_path) - self.Deleted(uid) - logger.debug('deleted %s', uid) + self.Deleted(*object_id) + logger.debug('deleted %r', object_id) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Deleted(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ss') + def Deleted(self, tree_id, version_id): + """Entry has been deleted.""" pass def stop(self): """shutdown the service""" + self._stop() + + def _stop(self): self._index_store.close_index() self.Stopped() - @dbus.service.signal(DS_DBUS_INTERFACE) + @dbus.service.signal(DS_DBUS_INTERFACE2) def Stopped(self): pass - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="sa{sv}", - out_signature='s') - def mount(self, uri, options=None): - return '' - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="", - out_signature="aa{sv}") - def mounts(self): - return [{'id': 1}] - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="s", - out_signature="") - def unmount(self, mountpoint_id): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Mounted(self, descriptior): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Unmounted(self, descriptor): - pass + def _gen_uuid(self): + return str(uuid.uuid4()) diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index 592c41a..ef6aba8 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -32,15 +32,16 @@ class FileStore(object): # TODO: add protection against store and retrieve operations on entries # that are being processed async. - def store(self, uid, file_path, transfer_ownership, completion_cb): + def store(self, object_id, file_path, transfer_ownership, completion_cb): """Store a file for a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + layout_manager = layoutmanager.get_instance() + dir_path = layout_manager.get_entry_path(object_id) if not os.path.exists(dir_path): os.makedirs(dir_path) - destination_path = layoutmanager.get_instance().get_data_path(uid) + destination_path = layout_manager.get_data_path(object_id) if file_path: if not os.path.isfile(file_path): raise ValueError('No file at %r' % file_path) @@ -89,15 +90,19 @@ class FileStore(object): unlink_src) async_copy.start() - def retrieve(self, uid, user_id, extension): + def has_data(self, object_id): + file_path = layoutmanager.get_instance().get_data_path(object_id) + return os.path.exists(file_path) + + def retrieve(self, object_id, user_id, extension): """Place the file associated to a given entry into a directory where the user can read it. The caller is reponsible for deleting this file. """ - file_path = layoutmanager.get_instance().get_data_path(uid) + file_path = layoutmanager.get_instance().get_data_path(object_id) if not os.path.exists(file_path): - logging.debug('Entry %r doesnt have any file', uid) + logging.debug('Entry %r doesnt have any file', object_id) return '' if extension is None: @@ -125,7 +130,7 @@ class FileStore(object): if use_instance_dir: os.umask(old_umask) - fd, destination_path = tempfile.mkstemp(prefix=uid + '_', + fd, destination_path = tempfile.mkstemp(prefix='%s-%s_' % object_id, suffix=extension, dir=destination_dir) os.close(fd) os.unlink(destination_path) @@ -143,21 +148,21 @@ class FileStore(object): return destination_path - def get_file_path(self, uid): - return layoutmanager.get_instance().get_data_path(uid) + def get_file_path(self, object_id): + return layoutmanager.get_instance().get_data_path(object_id) - def delete(self, uid): + def delete(self, object_id): """Remove the file associated to a given entry. """ - file_path = layoutmanager.get_instance().get_data_path(uid) + file_path = layoutmanager.get_instance().get_data_path(object_id) if os.path.exists(file_path): os.remove(file_path) - def hard_link_entry(self, new_uid, existing_uid): - existing_file = layoutmanager.get_instance().get_data_path( - existing_uid) - new_file = layoutmanager.get_instance().get_data_path(new_uid) + def hard_link_entry(self, new_object_id, existing_object_id): + layout_manager = layoutmanager.get_instance() + existing_file = layout_manager.get_data_path(existing_object_id) + new_file = layout_manager.get_data_path(new_object_id) logging.debug('removing %r', new_file) os.remove(new_file) @@ -183,8 +188,10 @@ class AsyncCopy(object): self.size = 0 def _cleanup(self): - os.close(self.src_fp) - os.close(self.dest_fp) + if self.src_fp != -1: + os.close(self.src_fp) + if self.dest_fp != -1: + os.close(self.dest_fp) os.chmod(self.dest, 0400) def _copy_block(self, user_data=None): @@ -208,9 +215,10 @@ class AsyncCopy(object): self._complete(None) return False except Exception, err: - logging.error('AC: Error copying %s -> %s: %r', self.src, self. - dest, err) - self._complete(err) + logging.exception('AC: Error copying %s -> %s', self.src, + self.dest) + self._cleanup() + self.completion(err) return False return True diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py index 80a1ade..c9cd052 100644 --- a/src/carquinyol/indexstore.py +++ b/src/carquinyol/indexstore.py @@ -25,16 +25,18 @@ from xapian import WritableDatabase, Document, Enquire, Query from carquinyol import layoutmanager from carquinyol.layoutmanager import MAX_QUERY_LIMIT -_VALUE_UID = 0 +_VALUE_TREE_ID = 0 _VALUE_TIMESTAMP = 1 _VALUE_TITLE = 2 -# 3 reserved for version support +_VALUE_VERSION_ID = 3 _VALUE_FILESIZE = 4 _VALUE_CREATION_TIME = 5 _PREFIX_NONE = 'N' _PREFIX_FULL_VALUE = 'F' -_PREFIX_UID = 'Q' +_PREFIX_OBJECT_ID = 'O' +_PREFIX_TREE_ID = 'Q' +_PREFIX_VERSION_ID = 'V' _PREFIX_ACTIVITY = 'A' _PREFIX_ACTIVITY_ID = 'I' _PREFIX_MIME_TYPE = 'M' @@ -51,7 +53,8 @@ _PROPERTIES_NOT_TO_INDEX = ['timestamp', 'preview'] _MAX_RESULTS = int(2 ** 31 - 1) _QUERY_TERM_MAP = { - 'uid': _PREFIX_UID, + 'tree_id': _PREFIX_TREE_ID, + 'version_id': _PREFIX_VERSION_ID, 'activity': _PREFIX_ACTIVITY, 'activity_id': _PREFIX_ACTIVITY_ID, 'mime_type': _PREFIX_MIME_TYPE, @@ -257,34 +260,36 @@ class IndexStore(object): for f in os.listdir(index_path): os.remove(os.path.join(index_path, f)) - def contains(self, uid): - postings = self._database.postlist(_PREFIX_FULL_VALUE + \ - _PREFIX_UID + uid) + def contains(self, object_id): + postings = self._database.postlist(self._object_id_term(object_id)) try: __ = postings.next() except StopIteration: return False return True - def store(self, uid, properties): + def store(self, object_id, properties): + tree_id, version_id = object_id + id_term = self._object_id_term(object_id) document = Document() - document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_TREE_ID, tree_id) + document.add_value(_VALUE_VERSION_ID, version_id) + document.add_term(id_term) term_generator = TermGenerator() term_generator.index_document(document, properties) - if not self.contains(uid): - self._database.add_document(document) + if self.contains(object_id): + self._database.replace_document(id_term, document) else: - self._database.replace_document(_PREFIX_FULL_VALUE + \ - _PREFIX_UID + uid, document) + self._database.add_document(document) self._flush() - def find(self, query): - offset = query.pop('offset', 0) - limit = query.pop('limit', MAX_QUERY_LIMIT) - order_by = query.pop('order_by', []) - query_string = query.pop('query', None) + def find(self, query, query_string, options): + offset = options.pop('offset', 0) + limit = options.pop('limit', MAX_QUERY_LIMIT) + order_by = options.pop('order_by', []) + all_versions = options.pop('all_versions', False) query_parser = QueryParser() query_parser.set_database(self._database) @@ -300,38 +305,101 @@ class IndexStore(object): order_by = order_by[0] if order_by == '+timestamp': - enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) - enquire.set_docid_order(False) + order_by_value = _VALUE_TIMESTAMP + order_by_direction = True elif order_by == '-timestamp': - enquire.set_sort_by_value(_VALUE_TIMESTAMP, False) - enquire.set_docid_order(True) + order_by_value = _VALUE_TIMESTAMP + order_by_direction = False elif order_by == '+title': - enquire.set_sort_by_value(_VALUE_TITLE, True) + order_by_value = _VALUE_TITLE + order_by_direction = True elif order_by == '-title': - enquire.set_sort_by_value(_VALUE_TITLE, False) + order_by_value = _VALUE_TITLE + order_by_direction = False elif order_by == '+filesize': - enquire.set_sort_by_value(_VALUE_FILESIZE, True) + order_by_value = _VALUE_FILESIZE + order_by_direction = True elif order_by == '-filesize': - enquire.set_sort_by_value(_VALUE_FILESIZE, False) + order_by_value = _VALUE_FILESIZE + order_by_direction = False elif order_by == '+creation_time': - enquire.set_sort_by_value(_VALUE_CREATION_TIME, True) + order_by_value = _VALUE_CREATION_TIME + order_by_direction = True elif order_by == '-creation_time': - enquire.set_sort_by_value(_VALUE_CREATION_TIME, False) + order_by_value = _VALUE_CREATION_TIME + order_by_direction = False else: + order_by_value = _VALUE_TIMESTAMP + order_by_direction = True logging.warning('Unsupported property for sorting: %s', order_by) order_by = '+timestamp' - query_result = enquire.get_mset(offset, limit, check_at_least) - total_count = query_result.get_matches_estimated() + logging.debug('order_by=%r, order_by_value=%r, order_by_direction=%r', + order_by, order_by_value, order_by_direction) + enquire.set_sort_by_value(order_by_value, reverse=order_by_direction) + enquire.set_docid_order({True: enquire.DESCENDING, + False: enquire.ASCENDING}[order_by_direction]) + + if not all_versions: + enquire.set_collapse_key(_VALUE_TREE_ID) + + if all_versions or (order_by == '+timestamp'): + logging.debug('using Xapian for sorting') +# query_result = enquire.get_mset(offset, limit, check_at_least) + # FIXME: work around Xapian returning incorrect match counts + query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT) + else: + # Xapian doesn't support using a different sort order while + # collapsing (which needs to be timestamp in our case), so + # we need to query everything and sort+limit ourselves. + logging.debug('using Xapian for collapsing only') + enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + enquire.set_docid_order(enquire.ASCENDING) + query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT) + + total_count = query_result.get_matches_lower_bound() + documents = [hit.document for hit in query_result] + + if (not all_versions) and (order_by != '+timestamp'): + logging.debug('sorting in Python') + def _cmp(a, b): + value_a = a.get_value(order_by_value) + value_b = b.get_value(order_by_value) + if value_a < value_b: + return -1 + elif value_a > value_b: + return 1 + elif a.get_docid() < b.get_docid(): + return -1 + elif a.get_docid() > b.get_docid(): + return 1 + return 0 + + documents.sort(cmp=_cmp, reverse=order_by_direction) + documents = documents[offset:offset+limit] + else: + # FIXME: work around Xapian returning incorrect match counts + logging.debug('doing offset/limit in Python (%r results, offset %r, limit %r)', + len(documents), offset, limit) + documents = documents[offset:offset+limit] + + object_ids = [] + for document in documents: + object_ids.append((document.get_value(_VALUE_TREE_ID), + document.get_value(_VALUE_VERSION_ID))) - uids = [] - for hit in query_result: - uids.append(hit.document.get_value(_VALUE_UID)) + return (object_ids, total_count) - return (uids, total_count) + def delete(self, object_id): + object_id_term = self._object_id_term(object_id) - def delete(self, uid): - self._database.delete_document(_PREFIX_FULL_VALUE + _PREFIX_UID + uid) + enquire = Enquire(self._database) + enquire.set_query(Query(object_id_term)) + query_results = enquire.get_mset(0, 2, 2) + documents = [hit.document for hit in query_results] + assert len(documents) == 1 + + self._database.delete_document(object_id_term) self._flush() def get_activities(self): @@ -341,6 +409,9 @@ class IndexStore(object): activities.append(term.term[len(prefix):]) return activities + def _object_id_term(self, object_id): + return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id + def flush(self): self._flush(True) diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py index 3179a98..afb9e42 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/carquinyol/layoutmanager.py @@ -58,15 +58,21 @@ class LayoutManager(object): version_path = os.path.join(self._root_path, 'version') open(version_path, 'w').write(str(version)) - def get_entry_path(self, uid): + def get_entry_path(self, object_id): # os.path.join() is just too slow - return '%s/%s/%s' % (self._root_path, uid[:2], uid) + tree_id, version_id = object_id + return '%s/%s/%s/%s' % (self._root_path, tree_id[:2], tree_id, + version_id) - def get_data_path(self, uid): - return '%s/%s/%s/data' % (self._root_path, uid[:2], uid) + def get_data_path(self, object_id): + tree_id, version_id = object_id + return '%s/%s/%s/%s/data' % (self._root_path, tree_id[:2], tree_id, + version_id) - def get_metadata_path(self, uid): - return '%s/%s/%s/metadata' % (self._root_path, uid[:2], uid) + def get_metadata_path(self, object_id): + tree_id, version_id = object_id + return '%s/%s/%s/%s/metadata' % (self._root_path, tree_id[:2], tree_id, + version_id) def get_root_path(self): return self._root_path @@ -81,13 +87,24 @@ class LayoutManager(object): return os.path.join(self.get_checksums_dir(), 'queue') def find_all(self): - uids = [] - for f in os.listdir(self._root_path): - if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: - for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - uids.append(g) - return uids + object_ids = [] + for tree_2 in os.listdir(self._root_path): + tree_2_path = os.path.join(self._root_path, tree_2) + if not (os.path.isdir(tree_2_path) and len(tree_2) == 2): + continue + + for tree_id in os.listdir(tree_2_path): + if len(tree_id) != 36: + continue + + tree_path = os.path.join(tree_2_path, tree_id) + for version_id in os.listdir(tree_path): + if len(version_id) != 36: + continue + + object_ids.append((tree_id, version_id)) + + return object_ids def is_empty(self): """Check if there is any existing entry. @@ -98,13 +115,21 @@ class LayoutManager(object): # unmigrated 0.82 data store return False - for f in os.listdir(self._root_path): - if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: - for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - return False + for tree_2 in os.listdir(self._root_path): + tree_2_path = os.path.join(self._root_path, tree_2) + if not (os.path.isdir(tree_2_path) and len(tree_2) == 2): + continue + + for tree_id in os.listdir(tree_2_path): + if len(tree_id) != 36: + continue + + tree_path = os.path.join(tree_2_path, tree_id) + return False + return True + _instance = None diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py index 5967017..c8ba56d 100644 --- a/src/carquinyol/metadatastore.py +++ b/src/carquinyol/metadatastore.py @@ -9,8 +9,9 @@ _INTERNAL_KEYS = ['checksum'] class MetadataStore(object): - def store(self, uid, metadata): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def store(self, object_id, metadata): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) if not os.path.exists(metadata_path): os.makedirs(metadata_path) else: @@ -18,14 +19,10 @@ class MetadataStore(object): if key not in _INTERNAL_KEYS: os.remove(os.path.join(metadata_path, key)) - metadata['uid'] = uid + tree_id, version_id = object_id + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id for key, value in metadata.items(): - - # Hack to support activities that still pass properties named as - # for example title:text. - if ':' in key: - key = key.split(':', 1)[0] - f = open(os.path.join(metadata_path, key), 'w') try: if isinstance(value, unicode): @@ -36,25 +33,31 @@ class MetadataStore(object): finally: f.close() - def retrieve(self, uid, properties=None): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def retrieve(self, object_id, properties=None): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) + if properties: + return metadatareader.retrieve(metadata_path, list(properties)) return metadatareader.retrieve(metadata_path, properties) - def delete(self, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def delete(self, object_id): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) for key in os.listdir(metadata_path): os.remove(os.path.join(metadata_path, key)) os.rmdir(metadata_path) - def get_property(self, uid, key): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def get_property(self, object_id, key): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) property_path = os.path.join(metadata_path, key) if os.path.exists(property_path): return open(property_path, 'r').read() else: return None - def set_property(self, uid, key, value): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def set_property(self, object_id, key, value): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) property_path = os.path.join(metadata_path, key) open(property_path, 'w').write(value) diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py index 1745f2c..d36fb09 100644 --- a/src/carquinyol/migration.py +++ b/src/carquinyol/migration.py @@ -23,10 +23,12 @@ import shutil import time import cjson +import uuid -from carquinyol import layoutmanager +from carquinyol import layoutmanager, metadatastore DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' +mstore = metadatastore.MetadataStore() def migrate_from_0(): @@ -38,22 +40,24 @@ def migrate_from_0(): return for f in os.listdir(old_root_path): - uid, ext = os.path.splitext(f) + tree_id, ext = os.path.splitext(f) if ext != '.metadata': continue - logging.debug('Migrating entry %r', uid) - - new_entry_dir = layoutmanager.get_instance().get_metadata_path(uid) + logging.debug('Migrating entry %r', tree_id) + version_id = _gen_uuid() + object_id = (tree_id, version_id) + new_entry_dir = layoutmanager.get_instance().get_metadata_path( + object_id) if not os.path.exists(new_entry_dir): os.makedirs(new_entry_dir) try: - _migrate_metadata(root_path, old_root_path, uid) - _migrate_file(root_path, old_root_path, uid) - _migrate_preview(root_path, old_root_path, uid) + _migrate_metadata_0(root_path, old_root_path, object_id) + _migrate_file_0(root_path, old_root_path, object_id) + _migrate_preview_0(root_path, old_root_path, object_id) except Exception: - logging.exception('Error while migrating entry %r', uid) + logging.exception('Error while migrating entry %r', object_id) # Just be paranoid, it's cheap. if old_root_path.endswith('datastore/store'): @@ -62,13 +66,16 @@ def migrate_from_0(): logging.info('Migration finished') -def _migrate_metadata(root_path, old_root_path, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) - old_metadata_path = os.path.join(old_root_path, uid + '.metadata') +def _migrate_metadata_0(root_path, old_root_path, object_id): + tree_id, version_id = object_id + dir_path = layoutmanager.get_instance().get_entry_path(object_id) + metadata_path = layoutmanager.get_instance().get_metadata_path(object_id) + old_metadata_path = os.path.join(old_root_path, tree_id + '.metadata') metadata = cjson.decode(open(old_metadata_path, 'r').read()) - if 'uid' not in metadata: - metadata['uid'] = uid + metadata.pop('uid', None) + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id if 'timestamp' not in metadata and 'mtime' in metadata: metadata['timestamp'] = \ @@ -87,17 +94,89 @@ def _migrate_metadata(root_path, old_root_path, uid): f.close() except Exception: logging.exception( - 'Error while migrating property %s of entry %s', key, uid) + 'Error while migrating property %r of entry %r', key, + object_id) -def _migrate_file(root_path, old_root_path, uid): - if os.path.exists(os.path.join(old_root_path, uid)): - new_data_path = layoutmanager.get_instance().get_data_path(uid) - os.rename(os.path.join(old_root_path, uid), +def _migrate_file_0(root_path, old_root_path, object_id): + tree_id, version_id_ = object_id + if os.path.exists(os.path.join(old_root_path, tree_id)): + new_data_path = layoutmanager.get_instance().get_data_path(object_id) + os.rename(os.path.join(old_root_path, tree_id), new_data_path) -def _migrate_preview(root_path, old_root_path, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) - os.rename(os.path.join(old_root_path, 'preview', uid), +def _migrate_preview_0(root_path, old_root_path, object_id): + tree_id, version_id_ = object_id + dir_path = layoutmanager.get_instance().get_entry_path(object_id) + metadata_path = layoutmanager.get_instance().get_metadata_path(object_id) + os.rename(os.path.join(old_root_path, 'preview', tree_id), os.path.join(metadata_path, 'preview')) + + +def migrate_from_1(): + logging.info('Migrating datastore from version 1 or 2') + + layout_manager = layoutmanager.get_instance() + root_path = layout_manager.get_root_path() + checksum_path = layout_manager.get_checksums_dir() + + version_ids = {} + for tree_2 in os.listdir(root_path): + if len(tree_2) != 2: + continue + + for tree_id in os.listdir(os.path.join(root_path, tree_2)): + if (len(tree_id) != 36): + continue + + logging.debug('Migrating entry %r' % tree_id) + + version_id = _gen_uuid() + version_ids[tree_id] = version_id + try: + old_path = os.path.join(root_path, tree_2, tree_id) + tmp_path = os.path.join(root_path, tree_2, tree_id+'.tmp') + os.rename(old_path, tmp_path) + try: + new_path = layout_manager.get_entry_path((tree_id, version_id)) + new_metadata_path = layout_manager.get_metadata_path((tree_id, + version_id)) + os.makedirs(os.path.dirname(new_path)) + os.rename(tmp_path, new_path) + except: + os.rename(tmp_path, old_path) + raise + + metadata = mstore.retrieve((tree_id, version_id)) + metadata.pop('uid', None) + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id + metadata['parent_id'] = '' + metadata['timestamp'] = metadata.get('timestamp', + os.stat(new_path).st_ctime) + mstore.store((tree_id, version_id), metadata) + + except Exception: + logging.exception('Error while migrating entry %r', tree_id) + + for checksum in os.listdir(checksum_path): + entries_path = os.path.join(checksum_path, checksum) + for tree_id in os.listdir(entries_path): + if len(tree_id) != 36: + continue + + try: + os.rename(os.path.join(entries_path, tree_id), + os.path.join(entries_path, '%s,%s' % (tree_id, + version_ids[tree_id]))) + + except Exception: + logging.exception('Error while migrating checksum entry ' + '%r / %r', checksum, tree_id) + + logging.info('Migration finished') + + +def _gen_uuid(): + return str(uuid.uuid4()) diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index c038c2b..48b05a6 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -33,36 +33,36 @@ class Optimizer(object): self._metadata_store = metadata_store self._enqueue_checksum_id = None - def optimize(self, uid): + def optimize(self, object_id): """Add an entry to a queue of entries to be checked for duplicates. """ - if not os.path.exists(self._file_store.get_file_path(uid)): + if not os.path.exists(self._file_store.get_file_path(object_id)): return - queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('optimize %r', os.path.join(queue_path, uid)) + entry_path = self._queue_entry_path(object_id) + open(entry_path, 'w').close() + logging.debug('queueing %r for optimization', entry_path) if self._enqueue_checksum_id is None: self._enqueue_checksum_id = \ gobject.idle_add(self._process_entry_cb, priority=gobject.PRIORITY_LOW) - def remove(self, uid): + def remove(self, object_id): """Remove any structures left from space optimization """ - checksum = self._metadata_store.get_property(uid, 'checksum') + checksum = self._metadata_store.get_property(object_id, 'checksum') if checksum is None: return checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - checksum_entry_path = os.path.join(checksum_path, uid) + checksum_entry_path = self._checksum_entry_path(checksum, object_id) if os.path.exists(checksum_entry_path): - logging.debug('remove %r', checksum_entry_path) + logging.debug('removing %r', checksum_entry_path) os.remove(checksum_entry_path) if os.path.exists(checksum_path): @@ -73,6 +73,14 @@ class Optimizer(object): if e.errno != errno.ENOTEMPTY: raise + def _queue_entry_path(self, object_id): + queue_path = layoutmanager.get_instance().get_queue_path() + return os.path.join(queue_path, '%s,%s' % object_id) + + def _checksum_entry_path(self, checksum, object_id): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + return os.path.join(checksums_dir, checksum, '%s,%s' % object_id) + def _identical_file_already_exists(self, checksum): """Check if we already have files with this checksum. @@ -81,14 +89,14 @@ class Optimizer(object): checksum_path = os.path.join(checksums_dir, checksum) return os.path.exists(checksum_path) - def _get_uid_from_checksum(self, checksum): + def _get_object_id_from_checksum(self, checksum): """Get an existing entry which file matches checksum. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - first_uid = os.listdir(checksum_path)[0] - return first_uid + first_object_id = os.listdir(checksum_path)[0].split(',') + return tuple(first_object_id) def _create_checksum_dir(self, checksum): """Create directory that tracks files with this same checksum. @@ -99,24 +107,21 @@ class Optimizer(object): logging.debug('create dir %r', checksum_path) os.mkdir(checksum_path) - def _add_checksum_entry(self, uid, checksum): - """Create a file in the checksum dir with the uid of the entry + def _add_checksum_entry(self, object_id, checksum): + """Create a file in the checksum dir with the object_id of the entry """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - - logging.debug('touch %r', os.path.join(checksum_path, uid)) - open(os.path.join(checksum_path, uid), 'w').close() + checksum_entry_path = self._checksum_entry_path(checksum, object_id) + logging.debug('touch %r', checksum_entry_path) + open(checksum_entry_path, 'w').close() - def _already_linked(self, uid, checksum): + def _already_linked(self, object_id, checksum): """Check if this entry's file is already a hard link to the checksums dir. """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) + checksum_entry_path = self._checksum_entry_path(checksum, object_id) + return os.path.exists(checksum_entry_path) def _process_entry_cb(self): """Process one item in the checksums queue by calculating its checksum, @@ -127,30 +132,31 @@ class Optimizer(object): queue_path = layoutmanager.get_instance().get_queue_path() queue = os.listdir(queue_path) if queue: - uid = queue[0] - logging.debug('_process_entry_cb processing %r', uid) + object_id = tuple(queue[0].split(',')) + logging.debug('_process_entry_cb processing %r', object_id) - file_in_entry_path = self._file_store.get_file_path(uid) + file_in_entry_path = self._file_store.get_file_path(object_id) if not os.path.exists(file_in_entry_path): - logging.info('non-existent entry in queue: %r', uid) + logging.info('non-existent entry in queue: %r', object_id) else: - checksum = self._calculate_md5sum(file_in_entry_path) - self._metadata_store.set_property(uid, 'checksum', checksum) + checksum = self.calculate_md5sum(file_in_entry_path) + self._metadata_store.set_property(object_id, 'checksum', + checksum) if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - existing_entry_uid = \ - self._get_uid_from_checksum(checksum) + if not self._already_linked(object_id, checksum): + existing_entry_object_id = \ + self._get_object_id_from_checksum(checksum) - self._file_store.hard_link_entry(uid, - existing_entry_uid) + self._file_store.hard_link_entry(object_id, + existing_entry_object_id) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(object_id, checksum) else: self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(object_id, checksum) - os.remove(os.path.join(queue_path, uid)) + os.remove(self._queue_entry_path(object_id)) if len(queue) <= 1: self._enqueue_checksum_id = None @@ -158,7 +164,7 @@ class Optimizer(object): else: return True - def _calculate_md5sum(self, path): + def calculate_md5sum(self, path): """Calculate the md5 checksum of a given file. """ |