diff options
author | Sascha Silbe <silbe@activitycentral.com> | 2011-03-04 14:02:13 (GMT) |
---|---|---|
committer | Sascha Silbe <silbe@activitycentral.com> | 2011-03-04 14:02:13 (GMT) |
commit | ad3e6fafb99eca267fc2f470ea7b6cb5b43eec3d (patch) | |
tree | 3fff9bb06fd8ad7fbaf47d1ab17f2f765daaf128 | |
parent | 83d9f81b5e442cd8e9006ec9a474cf6d0913578f (diff) | |
parent | 4babd564825dbcad358f8992abcaeefde78943cd (diff) |
Merge remote branch 'refs/remotes/origin/t/versions' into HEAD
* refs/remotes/origin/t/versions: (53 commits)
Add gconf setting /desktop/sugar/datastore/max_versions
Allow specifying the version_id of a new version.
add missing pieces for last commit
add compatibility with the old (unversioned) API
New TopGit dependency: t/rainbow-0.8
fix 0.82 migration typos
fix typos
New TopGit dependency: t/migration-rebuild-index
add metadata to Saved signal
adjust wording to new API
test suite: expect/filter out parent_id
save(): ensure parent_id is set correctly in metadata
index store: replace document if already in database (for change_metadata)
change_metadata(): make sure timestamp is set, like we do for save()
fix test suite failure
fix migration of checksum entries
work around Xapian returning incorrect match counts if offset/limit are used
fix sort order in ambiguous cases, fix obscure test suite breakage due to overlapping timestamp values
fix FileStore.retrieve() broken by last merge
test_massops.py: test ordering of find() results (for all supported orders) and offset/limit (for default order)
...
-rw-r--r-- | .topdeps | 6 | ||||
-rw-r--r-- | .topmsg | 12 | ||||
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | configure.ac | 9 | ||||
-rw-r--r-- | data/Makefile.am | 11 | ||||
-rw-r--r-- | data/sugar-datastore.schemas | 19 | ||||
-rw-r--r-- | src/carquinyol/datastore.py | 698 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 48 | ||||
-rw-r--r-- | src/carquinyol/indexstore.py | 143 | ||||
-rw-r--r-- | src/carquinyol/layoutmanager.py | 61 | ||||
-rw-r--r-- | src/carquinyol/metadatastore.py | 37 | ||||
-rw-r--r-- | src/carquinyol/migration.py | 123 | ||||
-rw-r--r-- | src/carquinyol/optimizer.py | 82 | ||||
-rw-r--r-- | tests/basic_api_v2.txt | 6 | ||||
-rw-r--r-- | tests/basic_api_v3.txt | 188 | ||||
-rwxr-xr-x | tests/runalltests.py | 5 | ||||
-rw-r--r-- | tests/test_massops.py | 82 | ||||
-rw-r--r-- | tests/test_migration_v2_v3.py (renamed from tests/test_migration_v1_v2.py) | 91 |
18 files changed, 1191 insertions, 432 deletions
@@ -1 +1,5 @@ -t/query-typed-props +t/timestamp-range-query-fix +t/testsuite +t/bug-1550-mkstemp-only +t/migration-rebuild-index +t/rainbow-0.8 @@ -1,14 +1,6 @@ From: Sascha Silbe <sascha@silbe.org> -Subject: [PATCH] fix range query for timestamp to do numerical comparison instead of lexical (#1342) +Subject: [PATCH] add version support -With the current code, anything Xapian value-stored is converted to a string -before querying, so numeric types end up as decimal strings. This won't work as -expected because Xapian does lexical comparison. We need to use -xapian.sortable_serialise() on numeric values to convert them to an (internal) -format that will result in "numerical" comparison. - -Changes index content format, so needs an index rebuild. - -Prerequisite: #1437 +Add version_id to all entries and use new API. Signed-off-by: Sascha Silbe <sascha@silbe.org> diff --git a/Makefile.am b/Makefile.am index d450f24..acd39b4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,3 +1,3 @@ ACLOCAL_AMFLAGS = -I m4 -SUBDIRS = bin etc src tests +SUBDIRS = bin data etc src tests diff --git a/configure.ac b/configure.ac index 10f2487..beba511 100644 --- a/configure.ac +++ b/configure.ac @@ -14,10 +14,19 @@ AC_PROG_LIBTOOL AM_PATH_PYTHON([2.5]) AM_CHECK_PYTHON_HEADERS(,[AC_MSG_ERROR(could not find Python headers)]) +PKG_CHECK_MODULES(SHELL, gconf-2.0) +AC_PATH_PROG(GCONFTOOL, gconftool-2, no) + +if test "$GCONFTOOL" = no; then + AC_MSG_ERROR([gconftool-2 executable not found in your path - should be installed with GConf]) +fi + +AM_GCONF_SOURCE_2 AC_OUTPUT([ Makefile bin/Makefile +data/Makefile etc/Makefile src/Makefile src/carquinyol/Makefile diff --git a/data/Makefile.am b/data/Makefile.am new file mode 100644 index 0000000..9127fa0 --- /dev/null +++ b/data/Makefile.am @@ -0,0 +1,11 @@ +sugardir = $(pkgdatadir)/data + +schemadir = $(GCONF_SCHEMA_FILE_DIR) +schema_DATA = sugar-datastore.schemas + +install-data-local: $(schema_DATA) +if GCONF_SCHEMAS_INSTALL + GCONF_CONFIG_SOURCE=$(GCONF_SCHEMA_CONFIG_SOURCE) $(GCONFTOOL) --makefile-install-rule sugar-datastore.schemas +endif + +EXTRA_DIST = $(schema_DATA) diff --git a/data/sugar-datastore.schemas b/data/sugar-datastore.schemas new file mode 100644 index 0000000..ad38e5c --- /dev/null +++ b/data/sugar-datastore.schemas @@ -0,0 +1,19 @@ +<?xml version="1.0"?> +<gconfschemafile> + <schemalist> + <schema> + <key>/schemas/desktop/sugar/datastore/max_versions</key> + <applyto>/desktop/sugar/datastore/max_versions</applyto> + <owner>sugar-datastore</owner> + <type>int</type> + <default>0</default> + <locale name="C"> + <short>Maximum number of versions to retain per entry</short> + <long>If the given number of versions already exist for an entry, the + oldest version will be removed when saving a new version. A value + of 0 indicates old versions should never be deleted automatically. + </long> + </locale> + </schema> + </schemalist> +</gconfschemafile> diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index 0fc8be1..bc8e208 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -25,9 +25,11 @@ import os import dbus import dbus.service +import gconf import gobject from sugar import mime +from sugar.logger import trace from carquinyol import layoutmanager from carquinyol import migration @@ -41,12 +43,207 @@ from carquinyol.optimizer import Optimizer DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' DS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE1 = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH1 = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE2 = "org.laptop.sugar.DataStore2" +DS_OBJECT_PATH2 = "/org/laptop/sugar/DataStore2" logger = logging.getLogger(DS_LOG_CHANNEL) +class DBusAPIv1(dbus.service.Object): + """Compatibility layer for old (unversioned) D-Bus API + """ + + # pylint: disable-msg=W0212 + + def __init__(self, data_store): + self._data_store = data_store + bus_name = dbus.service.BusName(DS_SERVICE, bus=dbus.SessionBus(), + replace_existing=False, allow_replacement=False) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH1) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='a{sv}sb', out_signature='s', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def create(self, props, file_path, transfer_ownership, + async_cb, async_err_cb): + + def success_cb(tree_id, child_id): + self.Created(tree_id) + async_cb(tree_id) + + self._data_store._save(tree_id='', parent_id='', metadata=props, + path=file_path, delete_after=transfer_ownership, + async_cb=success_cb, async_err_cb=async_err_cb) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Created(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def update(self, uid, props, file_path, transfer_ownership, + async_cb, async_err_cb): + + def success_cb(tree_id, child_id): + self.Updated(tree_id) + async_cb() + + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Trying to update non-existant entry %s - wanted' + ' to use create()?' % (uid, )) + + parent = latest_versions[0] + if self._compare_checksums(parent, file_path): + self._data_store._change_metadata(parent['tree_id'], + parent['version_id'], props) + return success_cb(uid, None) + + self._data_store._save(tree_id=uid, + parent_id=parent['version_id'], metadata=props, + path=file_path, delete_after=transfer_ownership, + async_cb=success_cb, async_err_cb=async_err_cb) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Updated(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='a{sv}as', + out_signature='aa{sv}u') + def find(self, query, properties): + if 'uid' in properties: + properties.append('tree_id') + properties.remove('uid') + + options = {'metadata': properties} + for name in ['offset', 'limit', 'order_by']: + if name in query: + options[name] = query.pop(name) + + if 'uid' in query: + query['tree_id'] = query.pop('uid') + + results, count = self._data_store._find(query, options, + query.get('query')) + + if 'tree_id' in properties: + for entry in results: + entry['uid'] = entry.pop('tree_id') + + return results, count + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='s', + sender_keyword='sender') + def get_filename(self, uid, sender=None): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + return self._data_store._get_data(uid, + latest_versions[0]['version_id'], sender=sender) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='a{sv}') + def get_properties(self, uid): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + latest_versions[0]['uid'] = latest_versions[0].pop('tree_id') + del latest_versions[0]['version_id'] + return latest_versions[0] + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='sa{sv}', + out_signature='as') + def get_uniquevaluesfor(self, propertyname, query=None): + return self._data_store._find_unique_values(query, propertyname) + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature='s', + out_signature='') + def delete(self, uid): + latest_versions = self._get_latest(uid) + if not latest_versions: + raise ValueError('Entry %s does not exist' % (uid, )) + + self._data_store._delete(tree_id=uid, + version_id=latest_versions[0]['version_id']) + self.Deleted(uid) + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="s") + def Deleted(self, uid): + pass + + def stop(self): + """shutdown the service""" + self._data_store._stop() + self.Stopped() + + @dbus.service.signal(DS_DBUS_INTERFACE1) + def Stopped(self): + pass + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="sa{sv}", + out_signature='s') + def mount(self, uri, options=None): + return '' + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="", + out_signature="aa{sv}") + def mounts(self): + return [{'id': 1}] + + @dbus.service.method(DS_DBUS_INTERFACE1, + in_signature="s", + out_signature="") + def unmount(self, mountpoint_id): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}") + def Mounted(self, descriptior): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE1, signature="a{sv}") + def Unmounted(self, descriptor): + pass + + def _get_latest(self, uid): + return self._data_store._find({'tree_id': uid}, + {'limit': 1, 'order_by': ['+timestamp']})[0] + + @trace() + def _compare_checksums(self, parent, child_data): + parent_data = self._data_store._file_store.get_file_path( + (parent['tree_id'], parent['version_id'])) + if bool(child_data) ^ os.path.exists(parent_data): + return False + elif not child_data: + return True + + if 'checksum' in parent: + parent_checksum = parent['checksum'] + else: + parent_checksum = self._data_store._optimizer.calculate_md5sum( + parent_data) + + child_checksum = self._data_store._optimizer.calculate_md5sum( + child_data) + return parent_checksum == child_checksum + + class DataStore(dbus.service.Object): """D-Bus API and logic for connecting all the other components. """ @@ -56,7 +253,12 @@ class DataStore(dbus.service.Object): bus=dbus.SessionBus(), replace_existing=False, allow_replacement=False) - dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH2) + self._api_v1 = DBusAPIv1(self) + gconf_client = gconf.client_get_default() + self._max_versions = gconf_client.get_int( + '/desktop/sugar/datastore/max_versions') + logging.debug('max_versions=%r', self._max_versions) migrated, initiated = self._open_layout() @@ -104,7 +306,7 @@ class DataStore(dbus.service.Object): if old_version == 0: migration.migrate_from_0() - elif old_version in [1, 2, 4] + elif old_version in [1, 2, 4]: migration.migrate_from_1() layout_manager.set_version(layoutmanager.CURRENT_LAYOUT_VERSION) @@ -119,26 +321,26 @@ class DataStore(dbus.service.Object): def _update_index(self): """Find entries that are not yet in the index and add them.""" - uids = layoutmanager.get_instance().find_all() + object_ids = layoutmanager.get_instance().find_all() logging.debug('Going to update the index with object_ids %r', - uids) + object_ids) self._index_updating = True - gobject.idle_add(lambda: self.__update_index_cb(uids), + gobject.idle_add(lambda: self.__update_index_cb(object_ids), priority=gobject.PRIORITY_LOW) - def __update_index_cb(self, uids): - if uids: - uid = uids.pop() + def __update_index_cb(self, object_ids): + if object_ids: + object_id = object_ids.pop() - logging.debug('Updating entry %r in index. %d to go.', uid, - len(uids)) + logging.debug('Updating entry %r in index. %d to go.', object_id, + len(object_ids)) - if not self._index_store.contains(uid): + if not self._index_store.contains(object_id): try: update_metadata = False - props = self._metadata_store.retrieve(uid) + props = self._metadata_store.retrieve(object_id) if 'filesize' not in props: - path = self._file_store.get_file_path(uid) + path = self._file_store.get_file_path(object_id) if os.path.exists(path): props['filesize'] = os.stat(path).st_size update_metadata = True @@ -157,12 +359,12 @@ class DataStore(dbus.service.Object): props['creation_time'] = props['timestamp'] update_metadata = True if update_metadata: - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) + self._metadata_store.store(object_id, props) + self._index_store.store(object_id, props) except Exception: - logging.exception('Error processing %r', uid) + logging.exception('Error processing %r', object_id) - if not uids: + if not object_ids: self._index_store.flush() self._index_updating = False logging.debug('Finished updating index.') @@ -170,199 +372,314 @@ class DataStore(dbus.service.Object): else: return True - def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug('_create_completion_cb(%r, %r, %r, %r)', async_cb, - async_err_cb, uid, exc) - if exc is not None: - async_err_cb(exc) - return + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ssa{sv}sb', + out_signature='ss', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb): + """Store new (version of an) object and return (tree_id, child_id). + + Submit new version (data and/or metadata) of the given object, + returning (tree_id, child_id). parent_id and child_id are the + version_id of the predecessor resp. the to-be-saved entry. + + If tree_id is empty parent_id must be empty as well and both will be + newly allocated (equivalent to create() in previous API). If only + parent_id is empty there must not be any entry with the same tree_id + already in datastore. + + File storage happens synchronously, emitting the signal + Saved(tree_id, parent_id, child_id) on completion (i.e. before + returning from save()). + + If path is non-empty it designates a file or directory containing the + data and the parent must contain of the same type (file / directory; + not metadata-only), if a parent exists at all. If path is empty and + the parent contained data then a metadata-only update is performed, + reusing the data of the parent. If both parent_id and path are empty + a metadata-only entry is created. + + If delete_after is True the contents of path are deleted after they + have been committed, but before sending the Saved() signal. + + metadata may contain a version_id field that will be used for the new + entry. In that case tree_id must be non-empty and there must not be an + existing entry in the data store with the same tree_id and version_id. + This functionality is intended for restoring from backups and importing + entries from other data stores; most API clients should leave it up to + the data store to choose a unique version_id. + """ + self._save(tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb) - self.Created(uid) - self._optimizer.optimize(uid) - logger.debug('created %s', uid) - async_cb(uid) + @trace(logger=logger) + def _save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, + async_err_cb): + if path: + if not os.access(path, os.R_OK): + raise ValueError('Invalid path given.') - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}sb', - out_signature='s', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def create(self, props, file_path, transfer_ownership, - async_cb, async_err_cb): - uid = str(uuid.uuid4()) - logging.debug('datastore.create %r', uid) + if delete_after and not os.access(os.path.dirname(path), os.W_OK): + raise ValueError('Deletion requested for read-only directory') + + if (not tree_id) and parent_id: + raise ValueError('tree_id is empty but parent_id is not') + + if tree_id and not parent_id: + if self._find({'tree_id': tree_id}, {'limit': 1})[1]: + raise ValueError('No parent_id given but tree_id already ' + 'exists') + + elif parent_id: + if not self._find({'tree_id': tree_id, 'version_id': parent_id}, + {'limit': 1})[1]: + raise ValueError('Given parent_id does not exist') + + if not tree_id: + tree_id = self._gen_uuid() + + child_id = metadata.get('version_id') + if not child_id: + child_id = self._gen_uuid() + elif not tree_id: + raise ValueError('No tree_id given but metadata contains version_id') + elif self._find({'tree_id': tree_id, 'version_id': child_id}, + {'limit': 1})[1]: + + raise ValueError('There is an existing entry with the same tree_id' + ' and version_id') - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) + if not tree_id: + tree_id = self._gen_uuid() + + metadata.setdefault('timestamp', time.time()) + metadata['parent_id'] = parent_id # FIXME: Support for the deprecated ctime property. Remove in 0.92. - if 'ctime' in props: + if 'ctime' in metadata: try: - props['creation_time'] = time.mktime(time.strptime( - migration.DATE_FORMAT, props['ctime'])) + metadata['creation_time'] = time.mktime(time.strptime( + migration.DATE_FORMAT, metadata['ctime'])) except (TypeError, ValueError): pass - if 'creation_time' not in props: - props['creation_time'] = props['timestamp'] - - if os.path.exists(file_path): - stat = os.stat(file_path) - props['filesize'] = stat.st_size - else: - props['filesize'] = 0 + if 'creation_time' not in metadata: + metadata['creation_time'] = metadata['timestamp'] - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, - lambda * args: self._create_completion_cb(async_cb, - async_err_cb, - uid, - * args)) + # FIXME: how to handle creation_time vs. versions? - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Created(self, uid): - pass + if os.path.exists(path): + stat = os.stat(path) + metadata['filesize'] = stat.st_size + else: + metadata['filesize'] = 0 + + object_id = (tree_id, child_id) + self._metadata_store.store(object_id, metadata) + self._index_store.store(object_id, metadata) + + if (not path) and self._file_store.has_data((tree_id, parent_id)): + # metadata-only update, reuse parent data + path = self._file_store.retrieve((tree_id, parent_id), + os.getuid(), 'update') + delete_after = True + + if path: + self._file_store.store(object_id, path, delete_after, + lambda *args, **kwargs: self._save_completion_cb( + tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb, **kwargs)) + else: + self._save_completion_cb(tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb) - def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug('_update_completion_cb() called with %r / %r, exc %r', - async_cb, async_err_cb, exc) + @trace(logger=logger) + def _save_completion_cb(self, tree_id, parent_id, child_id, metadata, + async_cb, async_err_cb, exc=None): + object_id = (tree_id, child_id) if exc is not None: async_err_cb(exc) return - self.Updated(uid) - self._optimizer.optimize(uid) - logger.debug('updated %s', uid) - async_cb() + self._check_max_versions(tree_id) + self.Saved(tree_id, parent_id, child_id, metadata) + self._optimizer.optimize(object_id) + async_cb(tree_id, child_id) - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}sb', - out_signature='', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def update(self, uid, props, file_path, transfer_ownership, - async_cb, async_err_cb): - logging.debug('datastore.update %r', uid) + def _check_max_versions(self, tree_id): + if not self._max_versions: + return - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) + old_versions = self._find({'tree_id': tree_id}, + {'all_versions': True, 'offset': self._max_versions, + 'metadata': ['tree_id', 'version_id', 'timestamp'], + 'order_by': ['+timestamp']})[0] + logging.info('Deleting old versions: %r', old_versions) + for entry in old_versions: + self._delete(entry['tree_id'], entry['version_id']) - # FIXME: Support for the deprecated ctime property. Remove in 0.92. - if 'ctime' in props: - try: - props['creation_time'] = time.mktime(time.strptime( - migration.DATE_FORMAT, props['ctime'])) - except (TypeError, ValueError): - pass + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='sssa{sv}') + def Saved(self, tree_id, parent_id, child_id, metadata): + """Entry created or updated. - if 'creation_time' not in props: - props['creation_time'] = props['timestamp'] - - if file_path: - # Empty file_path means skipping storage stage, see filestore.py - # TODO would be more useful to update filesize after real file save - if os.path.exists(file_path): - stat = os.stat(file_path) - props['filesize'] = stat.st_size - else: - props['filesize'] = 0 - - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - - if os.path.exists(self._file_store.get_file_path(uid)) and \ - (not file_path or os.path.exists(file_path)): - self._optimizer.remove(uid) - self._file_store.store(uid, file_path, transfer_ownership, - lambda * args: self._update_completion_cb(async_cb, - async_err_cb, - uid, - * args)) - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Updated(self, uid): + Entry identified by object_id has been saved to permanent storage. + """ pass - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}as', - out_signature='aa{sv}u') - def find(self, query, properties): - logging.debug('datastore.find %r', query) - t = time.time() + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ssa{sv}', + out_signature='', + byte_arrays=True) + def change_metadata(self, tree_id, version_id, metadata): + """Change metadata of existing entry. + + Changes the metadata of the object identified by object_id to match + metadata. Fully synchronous, no return value. + Emits signal ChangedMetadata(tree_id, version_id, metadata). + """ + self._change_metadata(tree_id, version_id, metadata) + + @trace(logger=logger) + def _change_metadata(self, tree_id, version_id, metadata): + object_id = (tree_id, version_id) + metadata.setdefault('timestamp', time.time()) + self._metadata_store.store(object_id, metadata) + self._index_store.store(object_id, metadata) + self.ChangedMetadata(tree_id, version_id, metadata) + + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssa{sv}') + def ChangedMetadata(self, tree_id, version_id, metadata): + """Metadata of entry changed.""" + pass + + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='a{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + def find(self, query, options): + return self._find(query, options) + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='sa{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + def textsearch(self, querystring, query, options): + return self._find(query, options, querystring) + + @trace(logger=logger) + def _find(self, query, options, querystring=None): if not self._index_updating: try: - uids, count = self._index_store.find(query) + object_ids, count = self._index_store.find(query, querystring, + options) except Exception: logging.exception('Failed to query index, will rebuild') self._rebuild_index() if self._index_updating: logging.warning('Index updating, returning all entries') - return self._find_all(query, properties) + return self._find_all(query, options) entries = [] - for uid in uids: - entry_path = layoutmanager.get_instance().get_entry_path(uid) + for object_id in object_ids: + entry_path = layoutmanager.get_instance().get_entry_path(object_id) if not os.path.exists(entry_path): logging.warning( 'Inconsistency detected, returning all entries') self._rebuild_index() - return self._find_all(query, properties) + return self._find_all(query, options) - metadata = self._metadata_store.retrieve(uid, properties) + metadata = self._metadata_store.retrieve(object_id, + options.get('metadata')) entries.append(metadata) - logger.debug('find(): %r', time.time() - t) - return entries, count - def _find_all(self, query, properties): - uids = layoutmanager.get_instance().find_all() - count = len(uids) + def _find_all(self, query, options): + object_ids = layoutmanager.get_instance().find_all() + + if not options.get('all_versions', False): + tids_vtime = {} + for tree_id, version_id in object_ids: + tids_vtime.setdefault(tree_id, []).append((version_id, + self._metadata_store.retrieve((tree_id, version_id), + 'timestamp'))) + + object_ids = [(tree_id, sorted(candidates, + key=lambda candidate_id: candidate_id[1], reverse=True)[0][0]) + for (tree_id, candidates) in tids_vtime.items()] + + count = len(object_ids) - offset = query.get('offset', 0) - limit = query.get('limit', MAX_QUERY_LIMIT) - uids = uids[offset:offset + limit] + offset = options.get('offset', 0) + limit = options.get('limit', MAX_QUERY_LIMIT) + object_ids = object_ids[offset:offset + limit] entries = [] - for uid in uids: - metadata = self._metadata_store.retrieve(uid, properties) + for object_id in object_ids: + metadata = self._metadata_store.retrieve(object_id, + options.get('metadata')) entries.append(metadata) return entries, count - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ss', out_signature='s', - sender_keyword='sender') - def get_filename(self, uid, sender=None): - logging.debug('datastore.get_filename %r', uid) + sender_keyword='sender', + byte_arrays=True) + @trace(logger=logger) + def get_data(self, tree_id, version_id, sender=None): + """Make given entry readable and return path. + + Hardlinks the given object into a global directory, making it readable + to the caller. Fully synchronous. + Signal GotData(sender, object_id, path) is emitted when data + is available. Sender is the bus name of the sender of the get_data() + request. GotData() may be sent before get_data() returns. + """ + return self._get_data(tree_id, version_id, sender) + + def _get_data(self, tree_id, version_id, sender=None): + object_id = (tree_id, version_id) user_id = dbus.Bus().get_unix_user(sender) - extension = self._get_extension(uid) - return self._file_store.retrieve(uid, user_id, extension) + extension = self._get_extension(object_id) + path = self._file_store.retrieve(object_id, user_id, extension) + self.GotData(sender, tree_id, version_id, path) + return path + + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ssss') + def GotData(self, sender, tree_id, version_id, path): + """Data is available. + + Data of entry identified by (tree_id, version_id) as requested by + sender is available at path now. Permissions are matching the sender, + so path may not be accessible to all listeners. + """ + pass - def _get_extension(self, uid): - mime_type = self._metadata_store.get_property(uid, 'mime_type') - if mime_type is None or not mime_type: + def _get_extension(self, object_id): + mime_type = self._metadata_store.get_property(object_id, 'mime_type') + if not mime_type: return '' return mime.get_primary_extension(mime_type) - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='a{sv}') - def get_properties(self, uid): - logging.debug('datastore.get_properties %r', uid) - metadata = self._metadata_store.retrieve(uid) - return metadata - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}', + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='a{sv}s', out_signature='as') - def get_uniquevaluesfor(self, propertyname, query=None): - if propertyname != 'activity': - raise ValueError('Only ''activity'' is a supported property name') + def find_unique_values(self, query, name): + """Collect unique values of property. + + Similar to find(), but returns the set of unique values for the + requested property. + """ + return self._find_unique_values(query, name) + + def _find_unique_values(self, query, name): + if name != 'activity': + raise ValueError('Only "activity" is a supported property name') if query: raise ValueError('The query parameter is not supported') if not self._index_updating: @@ -371,57 +688,48 @@ class DataStore(dbus.service.Object): logging.warning('Index updating, returning an empty list') return [] - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='') - def delete(self, uid): - self._optimizer.remove(uid) + @dbus.service.method(DS_DBUS_INTERFACE2, + in_signature='ss', + out_signature='', + byte_arrays=True) + def delete(self, tree_id, version_id): + """Delete entry. + + Remove given object from data store. Fully synchronous. Doesn't + return anything, emits signal Deleted(tree_id, version_id). + """ + return self._delete(tree_id, version_id) - self._index_store.delete(uid) - self._file_store.delete(uid) - self._metadata_store.delete(uid) + def _delete(self, tree_id, version_id): + object_id = (tree_id, version_id) + self._optimizer.remove(object_id) - entry_path = layoutmanager.get_instance().get_entry_path(uid) + self._index_store.delete(object_id) + self._file_store.delete(object_id) + self._metadata_store.delete(object_id) + + entry_path = layoutmanager.get_instance().get_entry_path(object_id) os.removedirs(entry_path) - self.Deleted(uid) - logger.debug('deleted %s', uid) + self.Deleted(*object_id) + logger.debug('deleted %r', object_id) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Deleted(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE2, signature='ss') + def Deleted(self, tree_id, version_id): + """Entry has been deleted.""" pass def stop(self): """shutdown the service""" + self._stop() + + def _stop(self): self._index_store.close_index() self.Stopped() - @dbus.service.signal(DS_DBUS_INTERFACE) + @dbus.service.signal(DS_DBUS_INTERFACE2) def Stopped(self): pass - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="sa{sv}", - out_signature='s') - def mount(self, uri, options=None): - return '' - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="", - out_signature="aa{sv}") - def mounts(self): - return [{'id': 1}] - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="s", - out_signature="") - def unmount(self, mountpoint_id): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Mounted(self, descriptior): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Unmounted(self, descriptor): - pass + def _gen_uuid(self): + return str(uuid.uuid4()) diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index 592c41a..ef6aba8 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -32,15 +32,16 @@ class FileStore(object): # TODO: add protection against store and retrieve operations on entries # that are being processed async. - def store(self, uid, file_path, transfer_ownership, completion_cb): + def store(self, object_id, file_path, transfer_ownership, completion_cb): """Store a file for a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + layout_manager = layoutmanager.get_instance() + dir_path = layout_manager.get_entry_path(object_id) if not os.path.exists(dir_path): os.makedirs(dir_path) - destination_path = layoutmanager.get_instance().get_data_path(uid) + destination_path = layout_manager.get_data_path(object_id) if file_path: if not os.path.isfile(file_path): raise ValueError('No file at %r' % file_path) @@ -89,15 +90,19 @@ class FileStore(object): unlink_src) async_copy.start() - def retrieve(self, uid, user_id, extension): + def has_data(self, object_id): + file_path = layoutmanager.get_instance().get_data_path(object_id) + return os.path.exists(file_path) + + def retrieve(self, object_id, user_id, extension): """Place the file associated to a given entry into a directory where the user can read it. The caller is reponsible for deleting this file. """ - file_path = layoutmanager.get_instance().get_data_path(uid) + file_path = layoutmanager.get_instance().get_data_path(object_id) if not os.path.exists(file_path): - logging.debug('Entry %r doesnt have any file', uid) + logging.debug('Entry %r doesnt have any file', object_id) return '' if extension is None: @@ -125,7 +130,7 @@ class FileStore(object): if use_instance_dir: os.umask(old_umask) - fd, destination_path = tempfile.mkstemp(prefix=uid + '_', + fd, destination_path = tempfile.mkstemp(prefix='%s-%s_' % object_id, suffix=extension, dir=destination_dir) os.close(fd) os.unlink(destination_path) @@ -143,21 +148,21 @@ class FileStore(object): return destination_path - def get_file_path(self, uid): - return layoutmanager.get_instance().get_data_path(uid) + def get_file_path(self, object_id): + return layoutmanager.get_instance().get_data_path(object_id) - def delete(self, uid): + def delete(self, object_id): """Remove the file associated to a given entry. """ - file_path = layoutmanager.get_instance().get_data_path(uid) + file_path = layoutmanager.get_instance().get_data_path(object_id) if os.path.exists(file_path): os.remove(file_path) - def hard_link_entry(self, new_uid, existing_uid): - existing_file = layoutmanager.get_instance().get_data_path( - existing_uid) - new_file = layoutmanager.get_instance().get_data_path(new_uid) + def hard_link_entry(self, new_object_id, existing_object_id): + layout_manager = layoutmanager.get_instance() + existing_file = layout_manager.get_data_path(existing_object_id) + new_file = layout_manager.get_data_path(new_object_id) logging.debug('removing %r', new_file) os.remove(new_file) @@ -183,8 +188,10 @@ class AsyncCopy(object): self.size = 0 def _cleanup(self): - os.close(self.src_fp) - os.close(self.dest_fp) + if self.src_fp != -1: + os.close(self.src_fp) + if self.dest_fp != -1: + os.close(self.dest_fp) os.chmod(self.dest, 0400) def _copy_block(self, user_data=None): @@ -208,9 +215,10 @@ class AsyncCopy(object): self._complete(None) return False except Exception, err: - logging.error('AC: Error copying %s -> %s: %r', self.src, self. - dest, err) - self._complete(err) + logging.exception('AC: Error copying %s -> %s', self.src, + self.dest) + self._cleanup() + self.completion(err) return False return True diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py index 80a1ade..c9cd052 100644 --- a/src/carquinyol/indexstore.py +++ b/src/carquinyol/indexstore.py @@ -25,16 +25,18 @@ from xapian import WritableDatabase, Document, Enquire, Query from carquinyol import layoutmanager from carquinyol.layoutmanager import MAX_QUERY_LIMIT -_VALUE_UID = 0 +_VALUE_TREE_ID = 0 _VALUE_TIMESTAMP = 1 _VALUE_TITLE = 2 -# 3 reserved for version support +_VALUE_VERSION_ID = 3 _VALUE_FILESIZE = 4 _VALUE_CREATION_TIME = 5 _PREFIX_NONE = 'N' _PREFIX_FULL_VALUE = 'F' -_PREFIX_UID = 'Q' +_PREFIX_OBJECT_ID = 'O' +_PREFIX_TREE_ID = 'Q' +_PREFIX_VERSION_ID = 'V' _PREFIX_ACTIVITY = 'A' _PREFIX_ACTIVITY_ID = 'I' _PREFIX_MIME_TYPE = 'M' @@ -51,7 +53,8 @@ _PROPERTIES_NOT_TO_INDEX = ['timestamp', 'preview'] _MAX_RESULTS = int(2 ** 31 - 1) _QUERY_TERM_MAP = { - 'uid': _PREFIX_UID, + 'tree_id': _PREFIX_TREE_ID, + 'version_id': _PREFIX_VERSION_ID, 'activity': _PREFIX_ACTIVITY, 'activity_id': _PREFIX_ACTIVITY_ID, 'mime_type': _PREFIX_MIME_TYPE, @@ -257,34 +260,36 @@ class IndexStore(object): for f in os.listdir(index_path): os.remove(os.path.join(index_path, f)) - def contains(self, uid): - postings = self._database.postlist(_PREFIX_FULL_VALUE + \ - _PREFIX_UID + uid) + def contains(self, object_id): + postings = self._database.postlist(self._object_id_term(object_id)) try: __ = postings.next() except StopIteration: return False return True - def store(self, uid, properties): + def store(self, object_id, properties): + tree_id, version_id = object_id + id_term = self._object_id_term(object_id) document = Document() - document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_TREE_ID, tree_id) + document.add_value(_VALUE_VERSION_ID, version_id) + document.add_term(id_term) term_generator = TermGenerator() term_generator.index_document(document, properties) - if not self.contains(uid): - self._database.add_document(document) + if self.contains(object_id): + self._database.replace_document(id_term, document) else: - self._database.replace_document(_PREFIX_FULL_VALUE + \ - _PREFIX_UID + uid, document) + self._database.add_document(document) self._flush() - def find(self, query): - offset = query.pop('offset', 0) - limit = query.pop('limit', MAX_QUERY_LIMIT) - order_by = query.pop('order_by', []) - query_string = query.pop('query', None) + def find(self, query, query_string, options): + offset = options.pop('offset', 0) + limit = options.pop('limit', MAX_QUERY_LIMIT) + order_by = options.pop('order_by', []) + all_versions = options.pop('all_versions', False) query_parser = QueryParser() query_parser.set_database(self._database) @@ -300,38 +305,101 @@ class IndexStore(object): order_by = order_by[0] if order_by == '+timestamp': - enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) - enquire.set_docid_order(False) + order_by_value = _VALUE_TIMESTAMP + order_by_direction = True elif order_by == '-timestamp': - enquire.set_sort_by_value(_VALUE_TIMESTAMP, False) - enquire.set_docid_order(True) + order_by_value = _VALUE_TIMESTAMP + order_by_direction = False elif order_by == '+title': - enquire.set_sort_by_value(_VALUE_TITLE, True) + order_by_value = _VALUE_TITLE + order_by_direction = True elif order_by == '-title': - enquire.set_sort_by_value(_VALUE_TITLE, False) + order_by_value = _VALUE_TITLE + order_by_direction = False elif order_by == '+filesize': - enquire.set_sort_by_value(_VALUE_FILESIZE, True) + order_by_value = _VALUE_FILESIZE + order_by_direction = True elif order_by == '-filesize': - enquire.set_sort_by_value(_VALUE_FILESIZE, False) + order_by_value = _VALUE_FILESIZE + order_by_direction = False elif order_by == '+creation_time': - enquire.set_sort_by_value(_VALUE_CREATION_TIME, True) + order_by_value = _VALUE_CREATION_TIME + order_by_direction = True elif order_by == '-creation_time': - enquire.set_sort_by_value(_VALUE_CREATION_TIME, False) + order_by_value = _VALUE_CREATION_TIME + order_by_direction = False else: + order_by_value = _VALUE_TIMESTAMP + order_by_direction = True logging.warning('Unsupported property for sorting: %s', order_by) order_by = '+timestamp' - query_result = enquire.get_mset(offset, limit, check_at_least) - total_count = query_result.get_matches_estimated() + logging.debug('order_by=%r, order_by_value=%r, order_by_direction=%r', + order_by, order_by_value, order_by_direction) + enquire.set_sort_by_value(order_by_value, reverse=order_by_direction) + enquire.set_docid_order({True: enquire.DESCENDING, + False: enquire.ASCENDING}[order_by_direction]) + + if not all_versions: + enquire.set_collapse_key(_VALUE_TREE_ID) + + if all_versions or (order_by == '+timestamp'): + logging.debug('using Xapian for sorting') +# query_result = enquire.get_mset(offset, limit, check_at_least) + # FIXME: work around Xapian returning incorrect match counts + query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT) + else: + # Xapian doesn't support using a different sort order while + # collapsing (which needs to be timestamp in our case), so + # we need to query everything and sort+limit ourselves. + logging.debug('using Xapian for collapsing only') + enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + enquire.set_docid_order(enquire.ASCENDING) + query_result = enquire.get_mset(0, MAX_QUERY_LIMIT, MAX_QUERY_LIMIT) + + total_count = query_result.get_matches_lower_bound() + documents = [hit.document for hit in query_result] + + if (not all_versions) and (order_by != '+timestamp'): + logging.debug('sorting in Python') + def _cmp(a, b): + value_a = a.get_value(order_by_value) + value_b = b.get_value(order_by_value) + if value_a < value_b: + return -1 + elif value_a > value_b: + return 1 + elif a.get_docid() < b.get_docid(): + return -1 + elif a.get_docid() > b.get_docid(): + return 1 + return 0 + + documents.sort(cmp=_cmp, reverse=order_by_direction) + documents = documents[offset:offset+limit] + else: + # FIXME: work around Xapian returning incorrect match counts + logging.debug('doing offset/limit in Python (%r results, offset %r, limit %r)', + len(documents), offset, limit) + documents = documents[offset:offset+limit] + + object_ids = [] + for document in documents: + object_ids.append((document.get_value(_VALUE_TREE_ID), + document.get_value(_VALUE_VERSION_ID))) - uids = [] - for hit in query_result: - uids.append(hit.document.get_value(_VALUE_UID)) + return (object_ids, total_count) - return (uids, total_count) + def delete(self, object_id): + object_id_term = self._object_id_term(object_id) - def delete(self, uid): - self._database.delete_document(_PREFIX_FULL_VALUE + _PREFIX_UID + uid) + enquire = Enquire(self._database) + enquire.set_query(Query(object_id_term)) + query_results = enquire.get_mset(0, 2, 2) + documents = [hit.document for hit in query_results] + assert len(documents) == 1 + + self._database.delete_document(object_id_term) self._flush() def get_activities(self): @@ -341,6 +409,9 @@ class IndexStore(object): activities.append(term.term[len(prefix):]) return activities + def _object_id_term(self, object_id): + return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id + def flush(self): self._flush(True) diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py index 3179a98..afb9e42 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/carquinyol/layoutmanager.py @@ -58,15 +58,21 @@ class LayoutManager(object): version_path = os.path.join(self._root_path, 'version') open(version_path, 'w').write(str(version)) - def get_entry_path(self, uid): + def get_entry_path(self, object_id): # os.path.join() is just too slow - return '%s/%s/%s' % (self._root_path, uid[:2], uid) + tree_id, version_id = object_id + return '%s/%s/%s/%s' % (self._root_path, tree_id[:2], tree_id, + version_id) - def get_data_path(self, uid): - return '%s/%s/%s/data' % (self._root_path, uid[:2], uid) + def get_data_path(self, object_id): + tree_id, version_id = object_id + return '%s/%s/%s/%s/data' % (self._root_path, tree_id[:2], tree_id, + version_id) - def get_metadata_path(self, uid): - return '%s/%s/%s/metadata' % (self._root_path, uid[:2], uid) + def get_metadata_path(self, object_id): + tree_id, version_id = object_id + return '%s/%s/%s/%s/metadata' % (self._root_path, tree_id[:2], tree_id, + version_id) def get_root_path(self): return self._root_path @@ -81,13 +87,24 @@ class LayoutManager(object): return os.path.join(self.get_checksums_dir(), 'queue') def find_all(self): - uids = [] - for f in os.listdir(self._root_path): - if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: - for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - uids.append(g) - return uids + object_ids = [] + for tree_2 in os.listdir(self._root_path): + tree_2_path = os.path.join(self._root_path, tree_2) + if not (os.path.isdir(tree_2_path) and len(tree_2) == 2): + continue + + for tree_id in os.listdir(tree_2_path): + if len(tree_id) != 36: + continue + + tree_path = os.path.join(tree_2_path, tree_id) + for version_id in os.listdir(tree_path): + if len(version_id) != 36: + continue + + object_ids.append((tree_id, version_id)) + + return object_ids def is_empty(self): """Check if there is any existing entry. @@ -98,13 +115,21 @@ class LayoutManager(object): # unmigrated 0.82 data store return False - for f in os.listdir(self._root_path): - if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: - for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - return False + for tree_2 in os.listdir(self._root_path): + tree_2_path = os.path.join(self._root_path, tree_2) + if not (os.path.isdir(tree_2_path) and len(tree_2) == 2): + continue + + for tree_id in os.listdir(tree_2_path): + if len(tree_id) != 36: + continue + + tree_path = os.path.join(tree_2_path, tree_id) + return False + return True + _instance = None diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py index 5967017..c8ba56d 100644 --- a/src/carquinyol/metadatastore.py +++ b/src/carquinyol/metadatastore.py @@ -9,8 +9,9 @@ _INTERNAL_KEYS = ['checksum'] class MetadataStore(object): - def store(self, uid, metadata): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def store(self, object_id, metadata): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) if not os.path.exists(metadata_path): os.makedirs(metadata_path) else: @@ -18,14 +19,10 @@ class MetadataStore(object): if key not in _INTERNAL_KEYS: os.remove(os.path.join(metadata_path, key)) - metadata['uid'] = uid + tree_id, version_id = object_id + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id for key, value in metadata.items(): - - # Hack to support activities that still pass properties named as - # for example title:text. - if ':' in key: - key = key.split(':', 1)[0] - f = open(os.path.join(metadata_path, key), 'w') try: if isinstance(value, unicode): @@ -36,25 +33,31 @@ class MetadataStore(object): finally: f.close() - def retrieve(self, uid, properties=None): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def retrieve(self, object_id, properties=None): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) + if properties: + return metadatareader.retrieve(metadata_path, list(properties)) return metadatareader.retrieve(metadata_path, properties) - def delete(self, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def delete(self, object_id): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) for key in os.listdir(metadata_path): os.remove(os.path.join(metadata_path, key)) os.rmdir(metadata_path) - def get_property(self, uid, key): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def get_property(self, object_id, key): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) property_path = os.path.join(metadata_path, key) if os.path.exists(property_path): return open(property_path, 'r').read() else: return None - def set_property(self, uid, key, value): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) + def set_property(self, object_id, key, value): + layout_manager = layoutmanager.get_instance() + metadata_path = layout_manager.get_metadata_path(object_id) property_path = os.path.join(metadata_path, key) open(property_path, 'w').write(value) diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py index 1745f2c..d36fb09 100644 --- a/src/carquinyol/migration.py +++ b/src/carquinyol/migration.py @@ -23,10 +23,12 @@ import shutil import time import cjson +import uuid -from carquinyol import layoutmanager +from carquinyol import layoutmanager, metadatastore DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' +mstore = metadatastore.MetadataStore() def migrate_from_0(): @@ -38,22 +40,24 @@ def migrate_from_0(): return for f in os.listdir(old_root_path): - uid, ext = os.path.splitext(f) + tree_id, ext = os.path.splitext(f) if ext != '.metadata': continue - logging.debug('Migrating entry %r', uid) - - new_entry_dir = layoutmanager.get_instance().get_metadata_path(uid) + logging.debug('Migrating entry %r', tree_id) + version_id = _gen_uuid() + object_id = (tree_id, version_id) + new_entry_dir = layoutmanager.get_instance().get_metadata_path( + object_id) if not os.path.exists(new_entry_dir): os.makedirs(new_entry_dir) try: - _migrate_metadata(root_path, old_root_path, uid) - _migrate_file(root_path, old_root_path, uid) - _migrate_preview(root_path, old_root_path, uid) + _migrate_metadata_0(root_path, old_root_path, object_id) + _migrate_file_0(root_path, old_root_path, object_id) + _migrate_preview_0(root_path, old_root_path, object_id) except Exception: - logging.exception('Error while migrating entry %r', uid) + logging.exception('Error while migrating entry %r', object_id) # Just be paranoid, it's cheap. if old_root_path.endswith('datastore/store'): @@ -62,13 +66,16 @@ def migrate_from_0(): logging.info('Migration finished') -def _migrate_metadata(root_path, old_root_path, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) - old_metadata_path = os.path.join(old_root_path, uid + '.metadata') +def _migrate_metadata_0(root_path, old_root_path, object_id): + tree_id, version_id = object_id + dir_path = layoutmanager.get_instance().get_entry_path(object_id) + metadata_path = layoutmanager.get_instance().get_metadata_path(object_id) + old_metadata_path = os.path.join(old_root_path, tree_id + '.metadata') metadata = cjson.decode(open(old_metadata_path, 'r').read()) - if 'uid' not in metadata: - metadata['uid'] = uid + metadata.pop('uid', None) + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id if 'timestamp' not in metadata and 'mtime' in metadata: metadata['timestamp'] = \ @@ -87,17 +94,89 @@ def _migrate_metadata(root_path, old_root_path, uid): f.close() except Exception: logging.exception( - 'Error while migrating property %s of entry %s', key, uid) + 'Error while migrating property %r of entry %r', key, + object_id) -def _migrate_file(root_path, old_root_path, uid): - if os.path.exists(os.path.join(old_root_path, uid)): - new_data_path = layoutmanager.get_instance().get_data_path(uid) - os.rename(os.path.join(old_root_path, uid), +def _migrate_file_0(root_path, old_root_path, object_id): + tree_id, version_id_ = object_id + if os.path.exists(os.path.join(old_root_path, tree_id)): + new_data_path = layoutmanager.get_instance().get_data_path(object_id) + os.rename(os.path.join(old_root_path, tree_id), new_data_path) -def _migrate_preview(root_path, old_root_path, uid): - metadata_path = layoutmanager.get_instance().get_metadata_path(uid) - os.rename(os.path.join(old_root_path, 'preview', uid), +def _migrate_preview_0(root_path, old_root_path, object_id): + tree_id, version_id_ = object_id + dir_path = layoutmanager.get_instance().get_entry_path(object_id) + metadata_path = layoutmanager.get_instance().get_metadata_path(object_id) + os.rename(os.path.join(old_root_path, 'preview', tree_id), os.path.join(metadata_path, 'preview')) + + +def migrate_from_1(): + logging.info('Migrating datastore from version 1 or 2') + + layout_manager = layoutmanager.get_instance() + root_path = layout_manager.get_root_path() + checksum_path = layout_manager.get_checksums_dir() + + version_ids = {} + for tree_2 in os.listdir(root_path): + if len(tree_2) != 2: + continue + + for tree_id in os.listdir(os.path.join(root_path, tree_2)): + if (len(tree_id) != 36): + continue + + logging.debug('Migrating entry %r' % tree_id) + + version_id = _gen_uuid() + version_ids[tree_id] = version_id + try: + old_path = os.path.join(root_path, tree_2, tree_id) + tmp_path = os.path.join(root_path, tree_2, tree_id+'.tmp') + os.rename(old_path, tmp_path) + try: + new_path = layout_manager.get_entry_path((tree_id, version_id)) + new_metadata_path = layout_manager.get_metadata_path((tree_id, + version_id)) + os.makedirs(os.path.dirname(new_path)) + os.rename(tmp_path, new_path) + except: + os.rename(tmp_path, old_path) + raise + + metadata = mstore.retrieve((tree_id, version_id)) + metadata.pop('uid', None) + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id + metadata['parent_id'] = '' + metadata['timestamp'] = metadata.get('timestamp', + os.stat(new_path).st_ctime) + mstore.store((tree_id, version_id), metadata) + + except Exception: + logging.exception('Error while migrating entry %r', tree_id) + + for checksum in os.listdir(checksum_path): + entries_path = os.path.join(checksum_path, checksum) + for tree_id in os.listdir(entries_path): + if len(tree_id) != 36: + continue + + try: + os.rename(os.path.join(entries_path, tree_id), + os.path.join(entries_path, '%s,%s' % (tree_id, + version_ids[tree_id]))) + + except Exception: + logging.exception('Error while migrating checksum entry ' + '%r / %r', checksum, tree_id) + + logging.info('Migration finished') + + +def _gen_uuid(): + return str(uuid.uuid4()) diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index c038c2b..48b05a6 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -33,36 +33,36 @@ class Optimizer(object): self._metadata_store = metadata_store self._enqueue_checksum_id = None - def optimize(self, uid): + def optimize(self, object_id): """Add an entry to a queue of entries to be checked for duplicates. """ - if not os.path.exists(self._file_store.get_file_path(uid)): + if not os.path.exists(self._file_store.get_file_path(object_id)): return - queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('optimize %r', os.path.join(queue_path, uid)) + entry_path = self._queue_entry_path(object_id) + open(entry_path, 'w').close() + logging.debug('queueing %r for optimization', entry_path) if self._enqueue_checksum_id is None: self._enqueue_checksum_id = \ gobject.idle_add(self._process_entry_cb, priority=gobject.PRIORITY_LOW) - def remove(self, uid): + def remove(self, object_id): """Remove any structures left from space optimization """ - checksum = self._metadata_store.get_property(uid, 'checksum') + checksum = self._metadata_store.get_property(object_id, 'checksum') if checksum is None: return checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - checksum_entry_path = os.path.join(checksum_path, uid) + checksum_entry_path = self._checksum_entry_path(checksum, object_id) if os.path.exists(checksum_entry_path): - logging.debug('remove %r', checksum_entry_path) + logging.debug('removing %r', checksum_entry_path) os.remove(checksum_entry_path) if os.path.exists(checksum_path): @@ -73,6 +73,14 @@ class Optimizer(object): if e.errno != errno.ENOTEMPTY: raise + def _queue_entry_path(self, object_id): + queue_path = layoutmanager.get_instance().get_queue_path() + return os.path.join(queue_path, '%s,%s' % object_id) + + def _checksum_entry_path(self, checksum, object_id): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + return os.path.join(checksums_dir, checksum, '%s,%s' % object_id) + def _identical_file_already_exists(self, checksum): """Check if we already have files with this checksum. @@ -81,14 +89,14 @@ class Optimizer(object): checksum_path = os.path.join(checksums_dir, checksum) return os.path.exists(checksum_path) - def _get_uid_from_checksum(self, checksum): + def _get_object_id_from_checksum(self, checksum): """Get an existing entry which file matches checksum. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - first_uid = os.listdir(checksum_path)[0] - return first_uid + first_object_id = os.listdir(checksum_path)[0].split(',') + return tuple(first_object_id) def _create_checksum_dir(self, checksum): """Create directory that tracks files with this same checksum. @@ -99,24 +107,21 @@ class Optimizer(object): logging.debug('create dir %r', checksum_path) os.mkdir(checksum_path) - def _add_checksum_entry(self, uid, checksum): - """Create a file in the checksum dir with the uid of the entry + def _add_checksum_entry(self, object_id, checksum): + """Create a file in the checksum dir with the object_id of the entry """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - - logging.debug('touch %r', os.path.join(checksum_path, uid)) - open(os.path.join(checksum_path, uid), 'w').close() + checksum_entry_path = self._checksum_entry_path(checksum, object_id) + logging.debug('touch %r', checksum_entry_path) + open(checksum_entry_path, 'w').close() - def _already_linked(self, uid, checksum): + def _already_linked(self, object_id, checksum): """Check if this entry's file is already a hard link to the checksums dir. """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) + checksum_entry_path = self._checksum_entry_path(checksum, object_id) + return os.path.exists(checksum_entry_path) def _process_entry_cb(self): """Process one item in the checksums queue by calculating its checksum, @@ -127,30 +132,31 @@ class Optimizer(object): queue_path = layoutmanager.get_instance().get_queue_path() queue = os.listdir(queue_path) if queue: - uid = queue[0] - logging.debug('_process_entry_cb processing %r', uid) + object_id = tuple(queue[0].split(',')) + logging.debug('_process_entry_cb processing %r', object_id) - file_in_entry_path = self._file_store.get_file_path(uid) + file_in_entry_path = self._file_store.get_file_path(object_id) if not os.path.exists(file_in_entry_path): - logging.info('non-existent entry in queue: %r', uid) + logging.info('non-existent entry in queue: %r', object_id) else: - checksum = self._calculate_md5sum(file_in_entry_path) - self._metadata_store.set_property(uid, 'checksum', checksum) + checksum = self.calculate_md5sum(file_in_entry_path) + self._metadata_store.set_property(object_id, 'checksum', + checksum) if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - existing_entry_uid = \ - self._get_uid_from_checksum(checksum) + if not self._already_linked(object_id, checksum): + existing_entry_object_id = \ + self._get_object_id_from_checksum(checksum) - self._file_store.hard_link_entry(uid, - existing_entry_uid) + self._file_store.hard_link_entry(object_id, + existing_entry_object_id) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(object_id, checksum) else: self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(object_id, checksum) - os.remove(os.path.join(queue_path, uid)) + os.remove(self._queue_entry_path(object_id)) if len(queue) <= 1: self._enqueue_checksum_id = None @@ -158,7 +164,7 @@ class Optimizer(object): else: return True - def _calculate_md5sum(self, path): + def calculate_md5sum(self, path): """Calculate the md5 checksum of a given file. """ diff --git a/tests/basic_api_v2.txt b/tests/basic_api_v2.txt index d9a38ac..83c2bb2 100644 --- a/tests/basic_api_v2.txt +++ b/tests/basic_api_v2.txt @@ -61,9 +61,13 @@ Change some entries: >>> ds.update(o1_uid, {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'foo'}, '', False) >>> ds.update(o2_uid, {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'bar baz'}, '', False) >>> ds.update(o3_uid, {'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3', 'timestamp': 10000}, '', False) + +Note: This only gives this result with the non-version support compatibility because we didn't update the data, so change_metadata() could be used instead of save(). If we update the data, a new version will be created and because we backdate the timestamp the previous version would still be considered the "latest" one. >>> sorted(to_native(ds.find({}, ['title', 'activity'], byte_arrays=True)[0])) [{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}] +previous: [dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 1 updated', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest1', variant_level=1)}, signature=dbus.Signature('sv')), dbus.Dictionary({dbus.String(u'title'): dbus.ByteArray('DS test object 2', variant_level=1), dbus.String(u'activity'): dbus.ByteArray('org.sugarlabs.DataStoreTest3', variant_level=1)}, signature=dbus.Signature('sv'))] + Retrieve metadata for a single entry, ignoring variable data: >>> d=dict(ds.get_properties(o3_uid, byte_arrays=True)) >>> del d['uid'], d['timestamp'] @@ -87,7 +91,7 @@ Find entries using "unknown" metadata (=> returns all entries): You can specify a (primary) sort order. Please note that the secondary sort order is undefined / implementation-dependent. >>> to_native(ds.find({'order_by': ['+title']}, ['title', 'activity'], byte_arrays=True)[0]) -[{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}] +[{u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}] >>> to_native(ds.find({'order_by': ['-title']}, ['title', 'activity'], byte_arrays=True)[0]) [{u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 1 updated'}, {u'activity': 'org.sugarlabs.DataStoreTest1', u'title': 'DS test object 2'}, {u'activity': 'org.sugarlabs.DataStoreTest3', u'title': 'DS test object 2'}] diff --git a/tests/basic_api_v3.txt b/tests/basic_api_v3.txt new file mode 100644 index 0000000..fda2e55 --- /dev/null +++ b/tests/basic_api_v3.txt @@ -0,0 +1,188 @@ +>>> import os +>>> import tempfile +>>> import time +>>> import uuid + +Define some helper functions +>>> def test_unique(items): +... return not [True for e in items if items.count(e) > 1] +>>> def to_native(value): +... if isinstance(value, list): +... return [to_native(e) for e in value] +... elif isinstance(value, dict): +... return dict([(to_native(k), to_native(v)) for k, v in value.items()]) +... elif isinstance(value, unicode): +... return unicode(value) +... elif isinstance(value, str): +... return str(value) +... return value + + +Connect to datastore using DBus and wait for it to get ready: +>>> import dbus +>>> DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" +>>> DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore2" +>>> DS_DBUS_PATH = "/org/laptop/sugar/DataStore2" +>>> bus = dbus.SessionBus() +>>> ds = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH), DS_DBUS_INTERFACE) + + +Make sure we're starting from an empty datastore: +>>> assert ds.find({}, [], byte_arrays=True) == ([], 0) + +Create something to play with, slightly delayed to ensure timestamps are different (=> sorting): +>>> o1_oid = ds.save('', '', {'title': 'DS test object 1', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1'}, '', False) +>>> assert isinstance(o1_oid, tuple) and isinstance(o1_oid[0], basestring) and isinstance(o1_oid[1], basestring) +>>> o2_oid = ds.save('', '', {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest2'}, '', False) +>>> assert isinstance(o2_oid, tuple) and isinstance(o2_oid[0], basestring) and isinstance(o2_oid[1], basestring) +>>> o3_oid = ds.save('', '', {'title': 'DS test object 3', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest2'}, '', False) +>>> assert isinstance(o3_oid, tuple) and isinstance(o3_oid[0], basestring) and isinstance(o3_oid[1], basestring) +>>> o4_oid = ds.save('', '', {'title': 'DS test object 4', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest4', 'timestamp': 10000}, '', False) +>>> assert isinstance(o4_oid, tuple) and isinstance(o4_oid[0], basestring) and isinstance(o4_oid[1], basestring) +>>> assert test_unique([o1_oid, o2_oid, o3_oid, o4_oid]) + + +Check everything is there: +>>> assert sorted(to_native(ds.find({}, {'metadata': ['title', 'activity']}, byte_arrays=True)[0])) == \ +... [{u'title': u'DS test object 1', u'activity': u'org.sugarlabs.DataStoreTest1'}, {u'title': u'DS test object 2', u'activity': u'org.sugarlabs.DataStoreTest2'}, {u'title': u'DS test object 3', u'activity': u'org.sugarlabs.DataStoreTest2'}, {u'title': u'DS test object 4', u'activity': u'org.sugarlabs.DataStoreTest4'}] +>>> ds.get_data(o1_oid[0], o1_oid[1], byte_arrays=True) +dbus.String(u'') +>>> ds.get_data(o2_oid[0], o2_oid[1], byte_arrays=True) +dbus.String(u'') +>>> ds.get_data(o3_oid[0], o3_oid[1], byte_arrays=True) +dbus.String(u'') +>>> ds.get_data(o4_oid[0], o4_oid[1], byte_arrays=True) +dbus.String(u'') + + + +Test find_unique_values(). +>>> sorted(ds.find_unique_values({}, 'activity')) +[dbus.String(u'org.sugarlabs.DataStoreTest1'), dbus.String(u'org.sugarlabs.DataStoreTest2'), dbus.String(u'org.sugarlabs.DataStoreTest4')] + + +Change some entries: +>>> o1_oid2 = ds.save(o1_oid[0], o1_oid[1], {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'foo'}, '', False) +>>> o2_oid2 = ds.save(o2_oid[0], o2_oid[1], {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'bar baz'}, '', False) +>>> o3_oid2 = ds.save(o3_oid[0], o3_oid[1], {'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3'}, '', False) + +>>> assert sorted(to_native(ds.find({}, {'metadata': ['title', 'activity']}, byte_arrays=True)[0])) == \ +... [{u'title': u'DS test object 1 updated', u'activity': u'org.sugarlabs.DataStoreTest1'}, {u'title': u'DS test object 2', u'activity': u'org.sugarlabs.DataStoreTest1'}, {u'title': u'DS test object 2', u'activity': u'org.sugarlabs.DataStoreTest3'}, {u'title': u'DS test object 4', u'activity': u'org.sugarlabs.DataStoreTest4'}] + + +Retrieve metadata for a single entry, ignoring variable data: +>>> d=dict(ds.find({'tree_id': o3_oid2[0], 'version_id': o3_oid2[1]}, {}, byte_arrays=True)[0][0]) +>>> del d['tree_id'], d['version_id'], d['timestamp'], d['parent_id'], d['creation_time'] +>>> assert d == {'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3', 'filesize': '0'} + +Find entries using "known" metadata: +>>> assert to_native(ds.find({'mime_type': ['text/plain']}, {'metadata': ['title', 'activity', 'mime_type', 'tags']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'bar baz'}, +... {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'foo'}, +... {'title': 'DS test object 3', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest2'}] +>>> assert to_native(ds.find({'mime_type': ['text/html']}, {'metadata': ['title', 'activity', 'mime_type', 'tags']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3'}, +... {'title': 'DS test object 4', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest4'}] +>>> assert to_native(ds.find({'tree_id': o3_oid[0]}, {'metadata': ['title', 'activity', 'mime_type']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3'}] +>>> assert to_native(ds.find({'timestamp': (9000, 11000)}, {'metadata': ['title', 'activity', 'mime_type']}, byte_arrays=True)[0]) == \ +... [{u'title': u'DS test object 4', u'mime_type': u'text/html', u'activity': u'org.sugarlabs.DataStoreTest4'}] + +Find all versions of a given object: +>>> assert to_native(ds.find({'tree_id': o3_oid[0]}, {'metadata': ['title', 'activity', 'mime_type'], 'all_versions': True}, byte_arrays=True)[0]) == \ +... [{u'activity': u'org.sugarlabs.DataStoreTest3', u'mime_type': u'text/html', u'title': u'DS test object 2'}, +... {u'title': u'DS test object 3', u'mime_type': u'text/plain', u'activity': u'org.sugarlabs.DataStoreTest2'}] + +Find entries using "unknown" metadata (=> returns all entries): +>>> assert to_native(ds.find({'title': 'DS test object 2'}, {'metadata': ['title', 'activity', 'mime_type', 'tags']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 2', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest3'}, +... {'title': 'DS test object 2', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'bar baz'}, +... {'title': 'DS test object 1 updated', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', 'tags': 'foo'}, +... {'title': 'DS test object 4', 'mime_type': 'text/html', 'activity': 'org.sugarlabs.DataStoreTest4'}] + +Specify a sort order: +>>> assert to_native(ds.find({}, {'order_by': ['+title'], 'metadata': ['title', 'activity']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 4', 'activity': 'org.sugarlabs.DataStoreTest4'}, +... {'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest3'}, +... {'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest1'}, +... {'title': 'DS test object 1 updated', 'activity': 'org.sugarlabs.DataStoreTest1'}] +>>> assert to_native(ds.find({}, {'order_by': ['-title'], 'metadata': ['title', 'activity']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 1 updated', 'activity': 'org.sugarlabs.DataStoreTest1'}, +... {'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest1'}, +... {'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest3'}, +... {'title': 'DS test object 4', 'activity': 'org.sugarlabs.DataStoreTest4'}] + +Delete the latest version of an entry: +>>> ds.delete(*o1_oid2) +>>> assert to_native(ds.find({}, {'metadata': ['title', 'activity']}, byte_arrays=True)[0]) == \ +... [{'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest3'}, +... {'title': 'DS test object 2', 'activity': 'org.sugarlabs.DataStoreTest1'}, +... {'title': 'DS test object 1', 'activity': 'org.sugarlabs.DataStoreTest1'}, +... {'title': 'DS test object 4', 'activity': 'org.sugarlabs.DataStoreTest4'}] + +Create an entry with content: +>>> dog_content1 = 'The quick brown dog jumped over the lazy fox.' +>>> dog_props = {'title': 'dog/fox story', 'mime_type': 'text/plain'} +>>> dog_file = tempfile.NamedTemporaryFile() +>>> dog_file.write(dog_content1) +>>> dog_file.flush() +>>> dog_oid1 = ds.save('', '', dog_props, dog_file.name, False) + +Retrieve and verify the entry with content: +>>> dog_retrieved = ds.get_data(dog_oid1[0], dog_oid1[1], byte_arrays=True) +>>> assert(file(dog_retrieved).read() == dog_content1) +>>> os.remove(dog_retrieved) + +Update the entry content: +>>> dog_content2 = 'The quick brown fox jumped over the lazy dog.' +>>> dog_file.seek(0) +>>> dog_file.write(dog_content2) +>>> dog_file.flush() +>>> dog_oid2 = ds.save(dog_oid1[0], dog_oid1[1], dog_props, dog_file.name, False) +>>> assert dog_oid1[0] == dog_oid2[0] + +Verify updated content: +>>> dog_retrieved = ds.get_data(dog_oid2[0], dog_oid2[1], byte_arrays=True) +>>> assert(file(dog_retrieved).read() == dog_content2) +>>> os.remove(dog_retrieved) +>>> dog_file.close() + +Verify old content is still there: +>>> dog_retrieved = ds.get_data(dog_oid1[0], dog_oid1[1], byte_arrays=True) +>>> assert(file(dog_retrieved).read() == dog_content1) +>>> os.remove(dog_retrieved) +>>> dog_file.close() + +Import an entry from another system, keeping tree_id + version_id intact: +>>> cat_oid = (str(uuid.uuid4()), str(uuid.uuid4())) +>>> cat_props = {'title': 'Cat story', 'mime_type': 'text/plain', +... 'tree_id': cat_oid[0], 'version_id': cat_oid[1]} +>>> cat_content1 = 'The cat chased the dog.' +>>> cat_file = tempfile.NamedTemporaryFile() +>>> cat_file.write(cat_content1) +>>> cat_file.flush() +>>> cat_oid_saved = ds.save(cat_oid[0], '', cat_props, cat_file.name, False) +>>> assert cat_oid == cat_oid_saved + +Import a second version of the same entry: +>>> cat_oid2 = (cat_oid[0], str(uuid.uuid4())) +>>> cat_props2 = {'title': 'Cat story revised', 'mime_type': 'text/plain', +... 'tree_id': cat_oid2[0], 'version_id': cat_oid2[1]} +>>> cat_content2 = 'The cat chased the lazy dog.' +>>> cat_file.write(cat_content2) +>>> cat_file.flush() +>>> cat_oid2_saved = ds.save(cat_oid2[0], cat_oid[1], cat_props2, +... cat_file.name, False) +>>> assert cat_oid2 == cat_oid2_saved + +Importing the same version again is an error because tree_id + version_id must +be unique and the data of a version is supposed to be immutable. +>>> try: +... cat_oid2_saved = ds.save(cat_oid2[0], cat_oid[1], cat_props2, +... cat_file.name, False) +... except dbus.DBusException, exception: +... assert exception.get_dbus_name() == \ +... 'org.freedesktop.DBus.Python.ValueError' +... print exception.get_dbus_message().strip().split('\n')[-1] +ValueError: There is an existing entry with the same tree_id and version_id +>>> cat_file.close() diff --git a/tests/runalltests.py b/tests/runalltests.py index f59200f..f00af3b 100755 --- a/tests/runalltests.py +++ b/tests/runalltests.py @@ -33,13 +33,14 @@ logging.basicConfig(level=logging.WARN, DOCTESTS = [ "basic_api_v2.txt", + "basic_api_v3.txt", ] DOCTEST_OPTIONS = doctest.ELLIPSIS DOCTEST_OPTIONS |= doctest.REPORT_ONLY_FIRST_FAILURE DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_DBUS_PATH = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore2" +DS_DBUS_PATH = "/org/laptop/sugar/DataStore2" ENVIRONMENT_WHITELIST = [ 'LD_LIBRARY_PATH', diff --git a/tests/test_massops.py b/tests/test_massops.py index ddc5242..bcfa590 100644 --- a/tests/test_massops.py +++ b/tests/test_massops.py @@ -10,14 +10,17 @@ import unittest DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_DBUS_PATH = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore2" +DS_DBUS_PATH = "/org/laptop/sugar/DataStore2" NUM_RUNS = int(os.environ.get('MASSOPS_RUNS', '100')) IGNORE_PROPERTIES = [ 'checksum', + 'creation_time', 'number', + 'parent_id', 'timestamp', - 'uid', + 'tree_id', + 'version_id', ] @@ -37,30 +40,31 @@ class MassOpsTestCase(unittest.TestCase): self._datastore = dbus.Interface(self._bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH), DS_DBUS_INTERFACE) + _create_content = 'Foo bar\n'*1000 _create_properties = { 'title': 'DS test object', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', + 'filesize': str(len(_create_content)), } - _create_content = 'Foo bar\n'*1000 def test_create(self): - """Run create() lots of times to create new objects.""" + """Run save() lots of times to create new objects.""" for i in range(NUM_RUNS): content_file = tempfile.NamedTemporaryFile() content_file.write(self._create_content) content_file.flush() properties = self._create_properties.copy() properties['number'] = str(i) - properties['timestamp'] = time.time() - self._datastore.create(properties, content_file.name, False) + self._datastore.save('', '', properties, + content_file.name, False) content_file.close() @repeat def test_find_all(self): """Run find() to list all entries.""" - entries, total_count = self._datastore.find({}, ['number'], - byte_arrays=True) + entries, total_count = self._datastore.find({}, + {'metadata': ['number']}, byte_arrays=True) self.assertEquals(total_count, NUM_RUNS) self.assertEquals(total_count, len(entries)) for position, entry in enumerate(entries): @@ -69,8 +73,9 @@ class MassOpsTestCase(unittest.TestCase): @repeat def test_find_all_reverse_time(self): """Run find() to list all entries in reverse chronological order.""" - entries, total_count = self._datastore.find({'order_by': - ['-timestamp']}, ['number'], byte_arrays=True) + entries, total_count = self._datastore.find({}, + {'metadata': ['number'], 'order_by': ['-timestamp']}, + byte_arrays=True) self.assertEquals(total_count, NUM_RUNS) self.assertEquals(total_count, len(entries)) for position, entry in enumerate(entries): @@ -79,16 +84,18 @@ class MassOpsTestCase(unittest.TestCase): @repeat def test_find_all_title(self): """Run find() to list all entries ordered by title.""" - entries, total_count = self._datastore.find({'order_by': - ['+title']}, ['tree_id'], byte_arrays=True) + entries, total_count = self._datastore.find({}, + {'metadata': ['tree_id'], 'order_by': ['+title']}, + byte_arrays=True) self.assertEquals(total_count, NUM_RUNS) self.assertEquals(total_count, len(entries)) @repeat def test_find_all_reverse_title(self): """Run find() to list all entries ordered by title (reversed).""" - entries, total_count = self._datastore.find({'order_by': - ['-title']}, ['tree_id'], byte_arrays=True) + entries, total_count = self._datastore.find({}, + {'metadata': ['tree_id'], 'order_by': ['-title']}, + byte_arrays=True) self.assertEquals(total_count, NUM_RUNS) self.assertEquals(total_count, len(entries)) @@ -97,9 +104,9 @@ class MassOpsTestCase(unittest.TestCase): """Run find() to list all entries in small chunks.""" chunk_size = 30 for chunk_start in range(0, NUM_RUNS, 30): - entries, total_count = self._datastore.find({ - 'offset': chunk_start, 'limit': chunk_size}, - ['number'], byte_arrays=True) + entries, total_count = self._datastore.find({}, + {'offset': chunk_start, 'limit': chunk_size, + 'metadata': ['number']}, byte_arrays=True) self.assertEquals(len(entries), min(chunk_size, NUM_RUNS-chunk_start)) self.assertEquals(total_count, NUM_RUNS) @@ -108,44 +115,51 @@ class MassOpsTestCase(unittest.TestCase): NUM_RUNS-(chunk_start+position)-1) def test_get_properties(self): - """Run get_properties() on all entries and verify result.""" - for entry in self._datastore.find({}, ['uid'], byte_arrays=True)[0]: - properties = self._datastore.get_properties(entry['uid'], - byte_arrays=True) + """Run find() to retrieve and verify single entry for all entries.""" + for entry in self._datastore.find({}, {'metadata': ['tree_id', + 'version_id']}, byte_arrays=True)[0]: + + properties = self._datastore.find({'tree_id': entry['tree_id'], + 'version_id': entry['version_id']}, {}, byte_arrays=True)[0][0] self._filter_properties(properties) self.assertEquals(properties, self._create_properties) - def test_get_filename(self): - """Run get_filename() on all entries and verify content.""" - for entry in self._datastore.find({}, ['uid'], byte_arrays=True)[0]: - filename = self._datastore.get_filename(entry['uid'], - byte_arrays=True) + def test_get_data(self): + """Run get_data() on all entries and verify content.""" + for entry in self._datastore.find({}, {'metadata': ['tree_id', + 'version_id']}, byte_arrays=True)[0]: + + filename = self._datastore.get_data(entry['tree_id'], + entry['version_id'], byte_arrays=True) try: self.assertEquals(file(filename).read(), self._create_content) finally: os.remove(filename) + _update_content = 'Foo bar baz\n'*1000 _update_properties = { 'title': 'DS test object (updated)', 'mime_type': 'text/plain', 'activity': 'org.sugarlabs.DataStoreTest1', + 'filesize': str(len(_update_content)), } - _update_content = 'Foo bar baz\n'*1000 def test_update(self): """Update the content of all existing entries""" content_file = tempfile.NamedTemporaryFile() content_file.write(self._update_content) content_file.flush() - for entry in self._datastore.find({}, ['uid'], byte_arrays=True)[0]: - self._datastore.update(entry['uid'], self._update_properties, - content_file.name, False) + for entry in self._datastore.find({}, {'metadata': ['tree_id', + 'version_id']}, byte_arrays=True)[0]: + + self._datastore.save(entry['tree_id'], entry['version_id'], + self._update_properties, content_file.name, False) def test_update_verify(self): """Verify test_update() has changed content and metadata of all entries.""" - for entry in self._datastore.find({}, [], byte_arrays=True)[0]: - filename = self._datastore.get_filename(entry['uid'], - byte_arrays=True) + for entry in self._datastore.find({}, {}, byte_arrays=True)[0]: + filename = self._datastore.get_data(entry['tree_id'], + entry['version_id'], byte_arrays=True) self._filter_properties(entry) try: self.assertEquals(entry, self._update_properties) diff --git a/tests/test_migration_v1_v2.py b/tests/test_migration_v2_v3.py index 82c4cac..bb7a8a0 100644 --- a/tests/test_migration_v1_v2.py +++ b/tests/test_migration_v2_v3.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -"""Test datastore migration from version 1 to version 2.""" +"""Large number of operations intended for measuring performance.""" import dbus import decorator @@ -12,25 +12,28 @@ import uuid DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" -DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" -DS_DBUS_PATH = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore2" +DS_DBUS_PATH = "/org/laptop/sugar/DataStore2" IGNORE_PROPERTIES = [ 'activity_id', 'checksum', + 'creation_time', 'ctime', 'mtime', 'number', + 'parent_id', 'timestamp', - 'uid', + 'tree_id', + 'version_id', ] -class MigrationV1V2TestCase(unittest.TestCase): - """Test datastore migration from version 1 to version 2.""" +class MigrationV2V3TestCase(unittest.TestCase): + """Test datastore migration from version 2 to version 3.""" def __init__(self, *args, **kwargs): unittest.TestCase.__init__(self, *args, **kwargs) - self._templates = self._v1_properties*10 + self._templates = self._v2_properties*10 def setUp(self): # pylint: disable-msg=C0103 @@ -38,13 +41,13 @@ class MigrationV1V2TestCase(unittest.TestCase): base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) self._root_path = os.path.join(base_dir, 'datastore') if not os.path.exists(self._root_path): - self._create_v1_datastore() + self._create_v2_datastore() self._bus = dbus.SessionBus() self._datastore = dbus.Interface(self._bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH), DS_DBUS_INTERFACE) - _v1_properties = [ + _v2_properties = [ { 'title': lambda number: 'DS test object %d' % (number, ), 'mime_type': 'text/plain', @@ -81,21 +84,23 @@ class MigrationV1V2TestCase(unittest.TestCase): }, {}, ] - def _v1_content(self, num): + + def _v2_content(self, num): return ('Foo bar %d\n' % (num, ))*1000 - def _create_v1_datastore(self): - """Create a version 1 datastore on disk.""" + def _create_v2_datastore(self): + """Create a version 2 datastore.""" os.makedirs(self._root_path) - file(os.path.join(self._root_path, 'version'), 'w').write('1') + file(os.path.join(self._root_path, 'version'), 'w').write('2') for i, template in enumerate(self._templates): metadata = self._fill_template(template, i) - data = self._v1_content(i) + data = self._v2_content(i) tree_id = str(uuid.uuid4()) - metadata['uid'] = tree_id - metadata['number'] = i + if metadata: + metadata['uid'] = tree_id + metadata['number'] = i - self._create_v1_entry(tree_id, metadata, data) + self._create_v2_entry(tree_id, metadata, data) def _fill_template(self, template, i): metadata = {} @@ -107,53 +112,65 @@ class MigrationV1V2TestCase(unittest.TestCase): return metadata - def _create_v1_entry(self, tree_id, metadata, data): - """Create a single version 1 datastore entry.""" + def _create_v2_entry(self, tree_id, metadata, data): + """Create a single version 2 datastore entry.""" checksum = hashlib.md5(data).hexdigest() entry_dir = os.path.join(self._root_path, tree_id[:2], tree_id) os.makedirs(entry_dir) file(os.path.join(entry_dir, 'data'), 'w').write(data) - self._write_v1_metadata(os.path.join(entry_dir, 'metadata'), metadata) + self._write_v2_metadata(os.path.join(entry_dir, 'metadata'), metadata) checksum_dir = os.path.join(self._root_path, 'checksums', checksum) os.makedirs(checksum_dir) file(os.path.join(checksum_dir, tree_id), 'w').close() - def _write_v1_metadata(self, directory, metadata): + def _write_v2_metadata(self, directory, metadata): os.makedirs(directory) for key, value in metadata.items(): file(os.path.join(directory, key), 'w').write(str(value)) def test_find_all(self): """Run find() to list all migrated entries.""" - entries, count = self._find({}, ['uid']) + entries, count = self._datastore.find({}, {'metadata': ['tree_id']}, + byte_arrays=True) self.assertEquals(count, len(self._templates)) def test_get_properties(self): - """Run get_properties() on all entries and verify result.""" - for entry in self._find({}, ['uid'])[0]: - properties = self._datastore.get_properties(entry['uid'], - byte_arrays=True) + """Run find() to retrieve and verify single entry for all entries.""" + for entry in self._datastore.find({}, {'metadata': ['tree_id', + 'version_id']}, byte_arrays=True)[0]: + + properties = self._datastore.find({'tree_id': entry['tree_id'], + 'version_id': entry['version_id']}, {}, byte_arrays=True)[0][0] + if 'number' not in properties: + continue + number = int(properties['number']) expected = self._fill_template(self._templates[number], number) + expected['filesize'] = str(len(self._v2_content(number))) self._filter_properties(properties) self._filter_properties(expected) self.assertEquals(properties, expected) - def test_get_filename(self): - """Run get_filename() on all entries and verify content.""" - for entry in self._find({}, ['number', 'uid'])[0]: - filename = self._datastore.get_filename(entry['uid'], - byte_arrays=True) + def test_get_data(self): + """Run get_data() on all entries and verify content.""" + for entry in self._datastore.find({}, {'metadata': ['number', + 'tree_id', 'version_id']}, byte_arrays=True)[0]: + + filename = self._datastore.get_data(entry['tree_id'], + entry['version_id'], byte_arrays=True) content = file(filename).read() os.remove(filename) + if 'number' not in entry: + self.assertNotEquals(content, '') + continue + number = int(entry['number']) - expected = self._v1_content(number) + expected = self._v2_content(number) self.assertEquals(content, expected) - def _find(self, query, properties): - return self._datastore.find(dbus.Dictionary(query, signature='sv'), - properties, byte_arrays=True) + # TODO: test updating existing entries + # TODO: test optimizer def _filter_properties(self, properties): for key in IGNORE_PROPERTIES: @@ -161,6 +178,6 @@ class MigrationV1V2TestCase(unittest.TestCase): def suite(): - test_suite = unittest.TestLoader().loadTestsFromTestCase(MigrationV1V2TestCase) - test_suite.__doc__ = MigrationV1V2TestCase.__doc__ + test_suite = unittest.TestLoader().loadTestsFromTestCase(MigrationV2V3TestCase) + test_suite.__doc__ = MigrationV2V3TestCase.__doc__ return test_suite |