diff options
author | Sascha Silbe <sascha@silbe.org> | 2009-08-17 11:30:12 (GMT) |
---|---|---|
committer | Sascha Silbe <sascha@silbe.org> | 2009-08-17 11:30:12 (GMT) |
commit | 0cedda85d11be006638e572c0ab2d4f570ec18e3 (patch) | |
tree | 5d674ae6a1d03c11ab3b70e8cf9874e64007e8b6 | |
parent | 6b679e122c2a45e29b61287eef3c083822535f49 (diff) |
version support single patch (6b679e122c2a45e29b61287eef3c083822535f49..dee44acbb76fd9e5b3226646bbf5c041f8890497)onlyversions-single
-rw-r--r-- | src/carquinyol/datastore.py | 322 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 77 | ||||
-rw-r--r-- | src/carquinyol/indexstore.py | 108 | ||||
-rw-r--r-- | src/carquinyol/layoutmanager.py | 20 | ||||
-rw-r--r-- | src/carquinyol/metadatareader.c | 24 | ||||
-rw-r--r-- | src/carquinyol/metadatastore.py | 38 | ||||
-rw-r--r-- | src/carquinyol/migration.py | 137 | ||||
-rw-r--r-- | src/carquinyol/optimizer.py | 62 |
8 files changed, 431 insertions, 357 deletions
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index a118e03..86757aa 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -22,9 +22,12 @@ import os import traceback import dbus +import dbus.service import gobject from sugar import mime +import sugar.datastore +from sugar.logger import trace from carquinyol import layoutmanager from carquinyol import migration @@ -56,7 +59,12 @@ class DataStore(dbus.service.Object): layout_manager = layoutmanager.get_instance() if layout_manager.get_version() == 0: migration.migrate_from_0() - layout_manager.set_version(1) + layout_manager.set_version(2) + layout_manager.index_updated = False + + if layout_manager.get_version() == 1: + migration.migrate_from_1() + layout_manager.set_version(2) layout_manager.index_updated = False self._metadata_store = MetadataStore() @@ -80,120 +88,136 @@ class DataStore(dbus.service.Object): self._optimizer = Optimizer(self._file_store, self._metadata_store) def _rebuild_index(self): - uids = layoutmanager.get_instance().find_all() - logging.debug('Going to update the index with uids %r' % uids) - gobject.idle_add(lambda: self.__rebuild_index_cb(uids), + tvids = layoutmanager.get_instance().find_all() + logging.debug('Going to update the index with tvids %r' % tvids) + gobject.idle_add(lambda: self.__rebuild_index_cb(tvids), priority=gobject.PRIORITY_LOW) - def __rebuild_index_cb(self, uids): - if uids: - uid = uids.pop() + def __rebuild_index_cb(self, tvids): + if tvids: + (tree_id,version_id) = tvids.pop() - logging.debug('Updating entry %r in index. %d to go.' % \ - (uid, len(uids))) + logging.debug('Updating entry (%r,%r) in index. %d to go.' % \ + (tree_id, version_id, len(tvids))) - if not self._index_store.contains(uid): + if not self._index_store.contains(tree_id,version_id): try: - props = self._metadata_store.retrieve(uid) - self._index_store.store(uid, props) + props = self._metadata_store.retrieve(tree_id,version_id) + self._index_store.store(tree_id, version_id, props) except Exception: - logging.error('Error processing %r\n%s.' \ - % (uid, traceback.format_exc())) + logging.error('Error processing (%r,%r)\n%s.' \ + % (tree_id, version_id, traceback.format_exc())) - if not uids: + if not tvids: logging.debug('Finished updating index.') layoutmanager.get_instance().index_updated = True + self.Ready() return False else: return True - def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \ - (async_cb, async_err_cb, uid, exc)) - if exc is not None: - async_err_cb(exc) - return + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='ssa{sv}sb', + out_signature='ss', + byte_arrays=True) + @trace(logger=logger) + def save(self, tree_id, parent_id, metadata, path, delete_after): + return self._save(tree_id, parent_id, metadata, path, delete_after) - self.Created(uid) - self._optimizer.optimize(uid) - logger.debug("created %s" % uid) - async_cb(uid) + def _save(self, tree_id, parent_id, metadata, path, delete_after): + # TODO: copy docstring from datastore-redesign.html + if (not tree_id) and parent_id : + raise sugar.datastore.InvalidArgumentError("tree_id is empty but parent_id is not") - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}sb', - out_signature='s', - async_callbacks=('async_cb', 'async_err_cb'), - byte_arrays=True) - def create(self, props, file_path, transfer_ownership, - async_cb, async_err_cb): - uid = str(uuid.uuid4()) - logging.debug('datastore.create %r' % uid) - - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) - - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, - lambda *args: self._create_completion_cb(async_cb, - async_err_cb, - uid, - *args)) - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Created(self, uid): - pass + if tree_id and not parent_id : + if self._find({'tree_id': tree_id}, {'limit': 1})[1] : + raise sugar.datastore.InvalidArgumentError("no parent_id given but tree_id already exists") + + elif parent_id : + if not self._find({'tree_id': tree_id, 'version_id': parent_id}, {'limit': 1})[1] : + raise sugar.datastore.InvalidArgumentError("given parent_id does not exist") + + if not tree_id : + tree_id = self._gen_uuid() - def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \ - (async_cb, async_err_cb, exc)) + child_id = self._gen_uuid() + + # TODO: create branch if required + + metadata['ctime'] = int(time.time()) + metadata['tree_id'] = tree_id + metadata['parent_id'] = parent_id + metadata['version_id'] = child_id + + self._metadata_store.store(tree_id, child_id, metadata) + # TODO: async + self._index_store.store(tree_id, child_id, metadata) + + if (not path) and self._file_store.has_data(tree_id, parent_id) : + # metadata-only update, reuse parent data + path = self._file_store.retrieve(tree_id, parent_id, os.getuid(), "foo") + + if path: + self._file_store.store(tree_id, child_id, path, delete_after, + lambda *args: self._save_completion_cb(tree_id, parent_id, child_id, *args)) + + return (tree_id, child_id) + + @trace(logger=logger) + def _save_completion_cb(self, tree_id, parent_id, child_id, exc=None): if exc is not None: - async_err_cb(exc) + logging.error("Error during saving of entry (%r,%r,%r):\n%s" % ( + tree_id, parent_id, child_id, traceback.format_exc(),)) + # FIXME: what to do on error? for the async API we already guaranteed ACID return - self.Updated(uid) - self._optimizer.optimize(uid) - logger.debug("updated %s" % uid) - async_cb() + self.Saved(tree_id, parent_id, child_id) + self._optimizer.optimize(tree_id, child_id) + logger.debug("updated %s %s" % (tree_id, child_id)) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="sss") + def Saved(self, tree_id, parent_id, child_id): + # TODO: copy docstring from datastore-redesign.html + pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}sb', + in_signature='ssa{sv}', out_signature='', - async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) - def update(self, uid, props, file_path, transfer_ownership, - async_cb, async_err_cb): - logging.debug('datastore.update %r' % uid) - - if not props.get('timestamp', ''): - props['timestamp'] = int(time.time()) - - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - - if os.path.exists(self._file_store.get_file_path(uid)) and \ - (not file_path or os.path.exists(file_path)): - self._optimizer.remove(uid) - self._file_store.store(uid, file_path, transfer_ownership, - lambda *args: self._update_completion_cb(async_cb, - async_err_cb, - uid, - *args)) - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Updated(self, uid): + @trace(logger=logger) + def change_metadata(self, tree_id, version_id, metadata) : + # TODO: copy docstring from datastore-redesign.html + self._metadata_store.store(tree_id, version_id, metadata) + self._index_store.store(tree_id, version_id, metadata) + self.ChangedMetadata(tree_id, version_id, metadata) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ssa{sv}") + def ChangedMetadata(self, tree_id, version_id, metadata): pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='a{sv}as', - out_signature='aa{sv}u') - def find(self, query, properties): - logging.debug('datastore.find %r' % query) + in_signature='a{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + @trace(logger=logger) + def find(self, query, options): + return self._find(query, options) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}a{sv}', + out_signature='aa{sv}u', + byte_arrays=True) + @trace(logger=logger) + def textsearch(self, querystring, query, options) : + return self._find(query, options, querystring) + + def _find(self, query, options, querystring = None) : t = time.time() + tvids = None if layoutmanager.get_instance().index_updated: try: - uids, count = self._index_store.find(query) + tvids, count = self._index_store.find(query, querystring, options) except Exception: logging.error('Failed to query index, will rebuild\n%s' \ % traceback.format_exc()) @@ -206,76 +230,91 @@ class DataStore(dbus.service.Object): if not layoutmanager.get_instance().index_updated: logging.warning('Index updating, returning all entries') - uids = layoutmanager.get_instance().find_all() - count = len(uids) + tvids = layoutmanager.get_instance().find_all() + if not options.get('all_versions', False) : + # only return latest version for each entry + tids_vtime = {} + for (tree_id, version_id) in tvids : + tids_vtime.setdefault(tree_id, []).append((version_id, self._metadata_store.retrieve(tree_id, version_id, 'timestamp'))) + + tvids = [(tree_id, sorted(candidates, key=lambda e: e[1], reverse=True)[0][0]) + for (tree_id, candidates) in tids_vtime.items()] + + count = len(tvids) - offset = query.get('offset', 0) - limit = query.get('limit', MAX_QUERY_LIMIT) - uids = uids[offset:offset + limit] + offset = options.get('offset', 0) + limit = options.get('limit', MAX_QUERY_LIMIT) + tvids = tvids[offset:offset + limit] +# logger.debug('tvids=%r' % (tvids,)) entries = [] - for uid in uids: - metadata = self._metadata_store.retrieve(uid, properties) + for (tree_id,version_id) in tvids: + metadata = self._metadata_store.retrieve(tree_id, version_id, options.get('metadata')) entries.append(metadata) - logger.debug('find(): %r' % (time.time() - t)) + return entries, count @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='s', - sender_keyword='sender') - def get_filename(self, uid, sender=None): - logging.debug('datastore.get_filename %r' % uid) + sender_keyword='sender', + byte_arrays=True) + @trace(logger=logger) + def get_data(self, tree_id, version_id, sender=None): + # TODO: copy docstring from datastore-redesign.html user_id = dbus.Bus().get_unix_user(sender) - extension = self._get_extension(uid) - return self._file_store.retrieve(uid, user_id, extension) + extension = self._get_extension(tree_id, version_id) + # TODO: async + path = self._file_store.retrieve(tree_id, version_id, user_id, extension) + self.GotData(sender, tree_id, version_id, path) + return path + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ssss") + def GotData(self, sender, tree_id, version_id, path) : + pass - def _get_extension(self, uid): - mime_type = self._metadata_store.get_property(uid, 'mime_type') + def _get_extension(self, tree_id, version_id): + mime_type = self._metadata_store.get_property(tree_id, version_id, 'mime_type') if mime_type is None or not mime_type: return '' return mime.get_primary_extension(mime_type) @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='a{sv}') - def get_properties(self, uid): - logging.debug('datastore.get_properties %r' % uid) - metadata = self._metadata_store.retrieve(uid) - return metadata - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}', + in_signature='a{sv}s', out_signature='as') - def get_uniquevaluesfor(self, propertyname, query=None): - if propertyname != 'activity': - raise ValueError('Only ''activity'' is a supported property name') + def find_unique_values(self, query, metadata_name): + # TODO: copy docstring + # TODO: support for arbitrary metadata names and query + if metadata_name != 'bundle_id': + raise ValueError('Only ''bundle_id'' is a supported property name') if query: raise ValueError('The query parameter is not supported') if layoutmanager.get_instance().index_updated: - return self._index_store.get_activities() + return self._index_store.get_bundle_ids() else: logging.warning('Index updating, returning an empty list') return [] @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', - out_signature='') - def delete(self, uid): - self._optimizer.remove(uid) - - self._index_store.delete(uid) - self._file_store.delete(uid) - self._metadata_store.delete(uid) + in_signature='ss', + out_signature='', + byte_arrays=True) + def delete(self, tree_id, version_id): + # TODO: version_id=None/'' => remove all versions + self._optimizer.remove(tree_id, version_id) + + self._index_store.delete(tree_id, version_id) + self._file_store.delete(tree_id, version_id) + self._metadata_store.delete(tree_id, version_id) - entry_path = layoutmanager.get_instance().get_entry_path(uid) + entry_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) os.removedirs(entry_path) - self.Deleted(uid) - logger.debug("deleted %s" % uid) + self.Deleted(tree_id, version_id) + logger.debug("deleted (%r,%r)" % (tree_id,version_id)) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Deleted(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss") + def Deleted(self, tree_id, version_id): pass def stop(self): @@ -288,28 +327,15 @@ class DataStore(dbus.service.Object): pass @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="sa{sv}", - out_signature='s') - def mount(self, uri, options=None): - return '' - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="", - out_signature="aa{sv}") - def mounts(self): - return [{'id': 1}] + in_signature='', + out_signature='b') + def check_ready(self): + return layoutmanager.get_instance().index_updated - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="s", - out_signature="") - def unmount(self, mountpoint_id): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Mounted(self, descriptior): + @dbus.service.signal(DS_DBUS_INTERFACE) + def Ready(self): pass - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Unmounted(self, descriptor): - pass + def _gen_uuid(self) : + return str(uuid.uuid4()) diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index f88c531..eebec8c 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -29,15 +29,15 @@ class FileStore(object): # TODO: add protection against store and retrieve operations on entries # that are being processed async. - def store(self, uid, file_path, transfer_ownership, completion_cb): + def store(self, tree_id, version_id, file_path, transfer_ownership, completion_cb): """Store a file for a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) if not os.path.exists(dir_path): os.makedirs(dir_path) - destination_path = os.path.join(dir_path, 'data') + destination_path = layoutmanager.get_instance().get_data_path(tree_id, version_id) if file_path: if not os.path.isfile(file_path): raise ValueError('No file at %r' % file_path) @@ -75,15 +75,20 @@ class FileStore(object): async_copy = AsyncCopy(file_path, destination_path, completion_cb) async_copy.start() - def retrieve(self, uid, user_id, extension): + def has_data(self, tree_id, version_id) : + dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) + file_path = os.path.join(dir_path, 'data') + return os.path.exists(file_path) + + def retrieve(self, tree_id, version_id, user_id, extension): """Place the file associated to a given entry into a directory where the user can read it. The caller is reponsible for deleting this file. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) - file_path = os.path.join(dir_path, 'data') + dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) + file_path = layoutmanager.get_instance().get_data_path(tree_id, version_id) if not os.path.exists(file_path): - logging.debug('Entry %r doesnt have any file' % uid) + logging.debug('Entry (%r,%r) doesnt have any file' % (tree_id,version_id)) return '' use_instance_dir = os.path.exists('/etc/olpc-security') and \ @@ -105,21 +110,13 @@ class FileStore(object): elif extension: extension = '.' + extension - destination_path = os.path.join(destination_dir, uid + extension) - - attempt = 1 - while os.path.exists(destination_path): - if attempt > 10: - fd_, destination_path = tempfile.mkstemp(prefix=uid, - suffix=extension, - dir=destination_dir) - del fd_ - os.unlink(destination_path) - break - else: - file_name = '%s_%s%s' % (uid, attempt, extension) - destination_path = os.path.join(destination_dir, file_name) - attempt += 1 + base_name = "%s-%s" % (tree_id, version_id) + fd_, destination_path = tempfile.mkstemp(prefix=base_name, + suffix=extension, + dir=destination_dir) + del fd_ + # FIXME + os.unlink(destination_path) # Try to hard link from the original file to the targetpath. This can # fail if the file is in a different filesystem. Do a symlink instead. @@ -128,39 +125,33 @@ class FileStore(object): except OSError, e: if e.errno == errno.EXDEV: os.symlink(file_path, destination_path) - else: - raise - # Try to make the original file readable. This can fail if the file is - # in a FAT filesystem. - try: - os.chmod(file_path, 0604) - except OSError, e: - if e.errno != errno.EPERM: + # Try to make the original file readable. This can fail if the file is + # in a FAT filesystem. + try: + os.chmod(file_path, 0444) + except OSError, e: + if e.errno != errno.EPERM: + raise + else: raise return destination_path - def get_file_path(self, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - return os.path.join(dir_path, 'data') + def get_file_path(self, tree_id, version_id): + return layoutmanager.get_instance().get_data_path(tree_id, version_id) - def delete(self, uid): + def delete(self, tree_id, version_id): """Remove the file associated to a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) - file_path = os.path.join(dir_path, 'data') + file_path = layoutmanager.get_instance().get_data_path(tree_id, version_id) if os.path.exists(file_path): os.remove(file_path) - def hard_link_entry(self, new_uid, existing_uid): - existing_file = os.path.join( - layoutmanager.get_instance().get_entry_path(existing_uid), - 'data') - new_file = os.path.join( - layoutmanager.get_instance().get_entry_path(new_uid), - 'data') + def hard_link_entry(self, new_tree_id, new_version_id, existing_tree_id, existing_version_id): + existing_file = layoutmanager.get_instance().get_data_path(existing_tree_id, existing_version_id) + new_file = layoutmanager.get_instance().get_data_path(new_tree_id, new_version_id) logging.debug('removing %r' % new_file) os.remove(new_file) diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py index 62aebb4..a8061a3 100644 --- a/src/carquinyol/indexstore.py +++ b/src/carquinyol/indexstore.py @@ -24,11 +24,13 @@ from xapian import WritableDatabase, Document, Enquire, Query, QueryParser from carquinyol import layoutmanager from carquinyol.layoutmanager import MAX_QUERY_LIMIT -_VALUE_UID = 0 +_VALUE_TID = 0 _VALUE_TIMESTAMP = 1 +_VALUE_VID = 2 -_PREFIX_UID = 'Q' -_PREFIX_ACTIVITY = 'A' +_PREFIX_TID = 'Q' +_PREFIX_VID = 'V' +_PREFIX_BUNDLE_ID = 'A' _PREFIX_ACTIVITY_ID = 'I' _PREFIX_MIME_TYPE = 'M' _PREFIX_KEEP = 'K' @@ -66,24 +68,28 @@ class IndexStore(object): for f in os.listdir(index_path): os.remove(os.path.join(index_path, f)) - def contains(self, uid): - postings = self._database.postlist(_PREFIX_UID + uid) + def contains(self, tree_id, version_id): + postings = self._database.postlist(_PREFIX_TID + tree_id + _PREFIX_VID + version_id) try: postlist_item = postings.next() except StopIteration: return False return True - def store(self, uid, properties): + def store(self, tree_id, version_id, properties): document = Document() - document.add_term(_PREFIX_UID + uid) - document.add_term(_PREFIX_ACTIVITY + properties.get('activity', '')) + docid = "%s%s%s%s" % (_PREFIX_TID, tree_id, _PREFIX_VID, version_id) + document.add_term(docid) + document.add_term(_PREFIX_TID + tree_id) + document.add_term(_PREFIX_VID + version_id) + document.add_term(_PREFIX_BUNDLE_ID + properties.get('bundle_id', '')) document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', '')) document.add_term(_PREFIX_ACTIVITY_ID + properties.get('activity_id', '')) document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0))) - document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_TID, tree_id) + document.add_value(_VALUE_VID, version_id) document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp'])) term_generator = xapian.TermGenerator() @@ -103,10 +109,10 @@ class IndexStore(object): term_generator.index_text_without_positions( self._extract_text(properties), 1, '') - if not self.contains(uid): + if not self.contains(tree_id, version_id): self._database.add_document(document) else: - self._database.replace_document(_PREFIX_UID + uid, document) + self._database.replace_document(docid, document) self._flush() def _extract_text(self, properties): @@ -122,33 +128,45 @@ class IndexStore(object): text += value return text - def find(self, query): + def find(self, query, querystring, options): enquire = Enquire(self._database) - enquire.set_query(self._parse_query(query)) - offset = query.get('offset', 0) - limit = query.get('limit', MAX_QUERY_LIMIT) + offset = options.pop('offset', 0) + limit = options.pop('limit', MAX_QUERY_LIMIT) + all_versions = options.pop('all_versions', False) + + enquire.set_query(self._parse_query(query, querystring)) # This will assure that the results count is exact. check_at_least = offset + limit + 1 enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + if not all_versions : + # only select newest entry (sort order) for each tree_id + enquire.set_collapse_key(_VALUE_TID) + query_result = enquire.get_mset(offset, limit, check_at_least) total_count = query_result.get_matches_estimated() - uids = [] - for hit in query_result: - uids.append(hit.document.get_value(_VALUE_UID)) - - return (uids, total_count) - - def _parse_query(self, query_dict): - logging.debug('_parse_query %r' % query_dict) + tvids = [(hit.document.get_value(_VALUE_TID), + hit.document.get_value(_VALUE_VID)) + for hit in query_result] + + return (tvids, total_count) + + _queryTermMap = { + 'tree_id': _PREFIX_TID, + 'version_id': _PREFIX_VID, + 'bundle_id': _PREFIX_BUNDLE_ID, + 'activity_id': _PREFIX_ACTIVITY_ID, + 'keep': _PREFIX_KEEP, + } + def _parse_query(self, query_dict, query_str): + logging.debug('_parse_query %r %r', query_dict, query_str) queries = [] - query_str = query_dict.pop('query', None) - if query_str is not None: + if query_str: query_parser = QueryParser() query_parser.set_database(self._database) #query_parser.set_default_op(Query.OP_AND) @@ -167,6 +185,12 @@ class IndexStore(object): queries.append(query) + # metadata -> term for simple datatypes (string, bool) + for (m_name, term_prefix) in self._queryTermMap.items() : + m_value = query_dict.pop(m_name, None) + if m_value is not None: + queries.append(Query(term_prefix+str(m_value))) + timestamp = query_dict.pop('timestamp', None) if timestamp is not None: start = str(timestamp.pop('start', 0)) @@ -174,24 +198,6 @@ class IndexStore(object): query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end) queries.append(query) - uid = query_dict.pop('uid', None) - if uid is not None: - queries.append(Query(_PREFIX_UID + uid)) - - activity = query_dict.pop('activity', None) - if activity is not None: - queries.append(Query(_PREFIX_ACTIVITY + activity)) - - activity_id = query_dict.pop('activity_id', None) - if activity_id is not None: - query = Query(_PREFIX_ACTIVITY_ID + activity_id) - queries.append(query) - - keep = query_dict.pop('keep', None) - if keep is not None: - query = Query(_PREFIX_KEEP + str(keep)) - queries.append(query) - mime_type = query_dict.pop('mime_type', None) if mime_type is not None: mime_queries = [] @@ -203,18 +209,18 @@ class IndexStore(object): queries.append(Query('')) if query_dict: - logging.warning('Unknown term(s): %r' % query_dict) + logging.warning('Unknown term(s): %r', query_dict) return Query(Query.OP_AND, queries) - def delete(self, uid): - self._database.delete_document(_PREFIX_UID + uid) + def delete(self, tree_id, version_id): + self._database.delete_document("%s%s%s%s" % (_PREFIX_TID, tree_id, _PREFIX_VID, version_id)) - def get_activities(self): - activities = [] - for term in self._database.allterms(_PREFIX_ACTIVITY): - activities.append(term.term[len(_PREFIX_ACTIVITY):]) - return activities + def get_bundle_ids(self): + bundle_ids = [] + for term in self._database.allterms(_PREFIX_BUNDLE_ID): + bundle_ids.append(term.term[len(_PREFIX_BUNDLE_ID):]) + return bundle_ids def _flush_timeout_cb(self): self._flush(True) diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py index dc3fde6..8be2a11 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/carquinyol/layoutmanager.py @@ -53,9 +53,17 @@ class LayoutManager(object): version_path = os.path.join(self._root_path, 'version') open(version_path, 'w').write(str(version)) - def get_entry_path(self, uid): + def get_entry_path(self, tree_id, version_id): # os.path.join() is just too slow - return '%s/%s/%s' % (self._root_path, uid[:2], uid) + return '%s/%s/%s-%s' % (self._root_path, tree_id[:2], tree_id, version_id) + + def get_data_path(self, tree_id, version_id) : + # os.path.join() is just too slow + return '%s/%s/%s-%s/data' % (self._root_path, tree_id[:2], tree_id, version_id) + + def get_metadata_path(self, tree_id, version_id) : + # os.path.join() is just too slow + return '%s/%s/%s-%s/metadata' % (self._root_path, tree_id[:2], tree_id, version_id) def get_root_path(self): return self._root_path @@ -85,13 +93,13 @@ class LayoutManager(object): index_updated = property(_is_index_updated, _set_index_updated) def find_all(self): - uids = [] + tvids = [] for f in os.listdir(self._root_path): if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - uids.append(g) - return uids + if len(g) == 73: + tvids.append((g[:36], g[37:])) + return tvids _instance = None def get_instance(): diff --git a/src/carquinyol/metadatareader.c b/src/carquinyol/metadatareader.c index 08be17e..edc338a 100644 --- a/src/carquinyol/metadatareader.c +++ b/src/carquinyol/metadatareader.c @@ -8,7 +8,7 @@ static PyObject *byte_array_type = NULL; int -add_property(char *metadata_path, char *property_name, PyObject *dict, +add_property(const char *metadata_path, char *property_name, PyObject *dict, int must_exist) { int file_path_size; @@ -125,7 +125,7 @@ cleanup: } static PyObject * -read_from_properties_list (char *metadata_path, PyObject *properties) +read_from_properties_list (const char *metadata_path, PyObject *properties) { PyObject *dict = PyDict_New(); @@ -148,7 +148,7 @@ cleanup: } static PyObject * -read_all_properties (char *metadata_path) +read_all_properties (const char *metadata_path) { PyObject *dict = PyDict_New(); DIR *dir_stream = NULL; @@ -198,34 +198,22 @@ metadatareader_retrieve(PyObject *unused, PyObject *args) { PyObject *dict = NULL; PyObject *properties = NULL; - const char *dir_path = NULL; - char *metadata_path = NULL; + const char *metadata_path = NULL; - if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties)) + if (!PyArg_ParseTuple(args, "sO:retrieve", &metadata_path, &properties)) return NULL; - // Build path to the metadata directory - int metadata_path_size = strlen(dir_path) + 10; - metadata_path = PyMem_Malloc(metadata_path_size); - if (metadata_path == NULL) { - PyErr_NoMemory(); - return NULL; - } - snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata"); - if ((properties != Py_None) && (PyList_Size(properties) > 0)) { dict = read_from_properties_list(metadata_path, properties); } else { dict = read_all_properties(metadata_path); } - PyMem_Free(metadata_path); - return dict; } static PyMethodDef metadatareader_functions[] = { - {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")}, + {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a directory with a single file (containing the content) per key")}, {NULL, NULL, 0, NULL} }; diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py index 50981f3..ffec4a8 100644 --- a/src/carquinyol/metadatastore.py +++ b/src/carquinyol/metadatastore.py @@ -6,19 +6,17 @@ from carquinyol import metadatareader MAX_SIZE = 256 class MetadataStore(object): - def store(self, uid, metadata): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - if not os.path.exists(dir_path): - os.makedirs(dir_path) - - metadata_path = os.path.join(dir_path, 'metadata') + def store(self, tree_id, version_id, metadata): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) if not os.path.exists(metadata_path): os.makedirs(metadata_path) else: for key in os.listdir(metadata_path): - os.remove(os.path.join(metadata_path, key)) + if key not in metadata : + os.remove(os.path.join(metadata_path, key)) - metadata['uid'] = uid + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id for key, value in metadata.items(): # Hack to support activities that still pass properties named as for @@ -36,29 +34,29 @@ class MetadataStore(object): finally: f.close() - def retrieve(self, uid, properties=None): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - return metadatareader.retrieve(dir_path, properties) + def retrieve(self, tree_id, version_id, properties=None): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) + if not os.path.exists(metadata_path) : + return None + + return metadatareader.retrieve(metadata_path, properties) - def delete(self, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - metadata_path = os.path.join(dir_path, 'metadata') + def delete(self, tree_id, version_id): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) for key in os.listdir(metadata_path): os.remove(os.path.join(metadata_path, key)) os.rmdir(metadata_path) - def get_property(self, uid, key): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - metadata_path = os.path.join(dir_path, 'metadata') + def get_property(self, tree_id, version_id, key): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) property_path = os.path.join(metadata_path, key) if os.path.exists(property_path): return open(property_path, 'r').read() else: return None - def set_property(self, uid, key, value): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - metadata_path = os.path.join(dir_path, 'metadata') + def set_property(self, tree_id, version_id, key, value): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) property_path = os.path.join(metadata_path, key) open(property_path, 'w').write(value) diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py index 228db2a..d979b5e 100644 --- a/src/carquinyol/migration.py +++ b/src/carquinyol/migration.py @@ -22,15 +22,19 @@ import logging import shutil import time import traceback +import uuid import cjson -from carquinyol import layoutmanager +from carquinyol import layoutmanager, metadatastore DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' +mstore = metadatastore.MetadataStore() + + def migrate_from_0(): - logging.info('Migrating datastore from version 0 to version 1') + logging.info('Migrating datastore from version 0 to version 2') root_path = layoutmanager.get_instance().get_root_path() old_root_path = os.path.join(root_path, 'store') @@ -38,18 +42,19 @@ def migrate_from_0(): return for f in os.listdir(old_root_path): - uid, ext = os.path.splitext(f) + tree_id, ext = os.path.splitext(f) if ext != '.metadata': continue - logging.debug('Migrating entry %r' % uid) + logging.debug('Migrating entry %r' % tree_id) + version_id = str(uuid.uuid4()) try: - _migrate_metadata(root_path, old_root_path, uid) - _migrate_file(root_path, old_root_path, uid) - _migrate_preview(root_path, old_root_path, uid) + _migrate_metadata_0(root_path, old_root_path, tree_id, version_id) + _migrate_file_0(root_path, old_root_path, tree_id, version_id) + _migrate_preview_0(root_path, old_root_path, tree_id, version_id) except Exception: logging.error('Error while migrating entry %r: %s\n' % \ - (uid, traceback.format_exc())) + (tree_id, traceback.format_exc())) # Just be paranoid, it's cheap. if old_root_path.endswith('datastore/store'): @@ -57,46 +62,94 @@ def migrate_from_0(): logging.info('Migration finished') -def _migrate_metadata(root_path, old_root_path, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - metadata_path = os.path.join(dir_path, 'metadata') - os.makedirs(metadata_path) +def _migrate_metadata_0(root_path, old_root_path, tree_id, version_id): + dir_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) - old_metadata_path = os.path.join(old_root_path, uid + '.metadata') + old_metadata_path = os.path.join(old_root_path, tree_id + '.metadata') metadata = cjson.decode(open(old_metadata_path, 'r').read()) - if 'uid' not in metadata: - metadata['uid'] = uid + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id + metadata['parent_id'] = '' if 'timestamp' not in metadata and 'mtime' in metadata: - metadata['timestamp'] = \ + metadata['ctime'] = \ time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT)) + elif 'timestamp' in metadata : + metadata['ctime'] = metadata.pop('timestamp') + else : + metadata['ctime'] = os.stat(old_metadata_path).st_ctime + + try: + mstore.store(tree_id, version_id, metadata) + + except: + logging.error( + 'Error while migrating property entry %s: %s\n' % \ + (tree_id, traceback.format_exc())) + +def _migrate_file_0(root_path, old_root_path, tree_id, version_id): + if os.path.exists(os.path.join(old_root_path, tree_id)): + new_data_path = layoutmanager.get_instance().get_data_path(tree_id, version_id) + os.rename(os.path.join(old_root_path, tree_id), + new_data_path) + +def _migrate_preview_0(root_path, old_root_path, tree_id, version_id): + metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) + os.rename(os.path.join(old_root_path, 'preview', tree_id), + os.path.join(metadata_path, 'preview')) - for key, value in metadata.items(): - try: - f = open(os.path.join(metadata_path, key), 'w') + +def migrate_from_1(): + logging.info('Migrating datastore from version 1 to version 2') + + root_path = layoutmanager.get_instance().get_root_path() + checksum_path = layoutmanager.get_instance().get_checksums_dir() + + version_ids = {} + for hash02 in os.listdir(root_path): + if len(hash02) != 2 : + continue + + for tree_id in os.listdir(os.path.join(root_path, hash02)) : + if (len(tree_id) != 36) : + continue + + logging.debug('Migrating entry %r' % tree_id) + + version_id = str(uuid.uuid4()) + version_ids[tree_id] = version_id try: - if isinstance(value, unicode): - value = value.encode('utf-8') - if not isinstance(value, basestring): - value = str(value) - f.write(value) - finally: - f.close() - except Exception: - logging.error( - 'Error while migrating property %s of entry %s: %s\n' % \ - (key, uid, traceback.format_exc())) - -def _migrate_file(root_path, old_root_path, uid): - if os.path.exists(os.path.join(old_root_path, uid)): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - os.rename(os.path.join(old_root_path, uid), - os.path.join(dir_path, 'data')) - -def _migrate_preview(root_path, old_root_path, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) - metadata_path = os.path.join(dir_path, 'metadata') - os.rename(os.path.join(old_root_path, 'preview', uid), - os.path.join(metadata_path, 'preview')) + new_path = layoutmanager.get_instance().get_entry_path(tree_id, version_id) + new_metadata_path = layoutmanager.get_instance().get_metadata_path(tree_id, version_id) + os.rename(os.path.join(root_path, hash02, tree_id), + new_path) + metadata = mstore.retrieve(tree_id, version_id) + metadata['tree_id'] = tree_id + metadata['version_id'] = version_id + metadata['parent_id'] = '' + metadata['ctime'] = metadata.get('timestamp', os.stat(new_path).st_ctime) + metadata.pop('uid') + mstore.store(tree_id, version_id, metadata) + + except Exception: + logging.error('Error while migrating entry %r: %s\n' % \ + (tree_id, traceback.format_exc())) + + for checksum in os.listdir(checksum_path) : + entries_path = os.path.join(checksum_path, checksum) + for tree_id in os.listdir(entries_path) : + if len(tree_id) != 36 : + continue + + try : + os.rename(os.path.join(entries_path, tree_id), + os.path.join(entries_path, "%s-%s" % (tree_id, version_ids[tree_id]))) + + except Exception: + logging.error('Error while migrating checksum entry %r / %r: %s\n' % \ + (checksum, tree_id, traceback.format_exc())) + + logging.info('Migration finished') diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index f8a2e3e..19849e6 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -18,11 +18,13 @@ import os import errno import logging import subprocess +import uuid import gobject from carquinyol import layoutmanager +# TODO: use layoutmanager for entries in 'checksums' directory class Optimizer(object): """Optimizes disk space usage by detecting duplicates and sharing storage. """ @@ -31,33 +33,34 @@ class Optimizer(object): self._metadata_store = metadata_store self._enqueue_checksum_id = None - def optimize(self, uid): + def optimize(self, tree_id, version_id): """Add an entry to a queue of entries to be checked for duplicates. """ - if not os.path.exists(self._file_store.get_file_path(uid)): + if not os.path.exists(self._file_store.get_file_path(tree_id, version_id)): return queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('optimize %r' % os.path.join(queue_path, uid)) + fname = os.path.join(queue_path, "%s-%s" % (tree_id, version_id)) + open(fname, 'w').close() + logging.debug('optimize %r' % fname) if self._enqueue_checksum_id is None: self._enqueue_checksum_id = \ gobject.idle_add(self._process_entry_cb, priority=gobject.PRIORITY_LOW) - def remove(self, uid): + def remove(self, tree_id, version_id): """Remove any structures left from space optimization """ - checksum = self._metadata_store.get_property(uid, 'checksum') + checksum = self._metadata_store.get_property(tree_id, version_id, 'checksum') if checksum is None: return checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - checksum_entry_path = os.path.join(checksum_path, uid) + checksum_entry_path = os.path.join(checksum_path, "%s-%s" % (tree_id,version_id)) if os.path.exists(checksum_entry_path): logging.debug('remove %r' % checksum_entry_path) @@ -79,14 +82,14 @@ class Optimizer(object): checksum_path = os.path.join(checksums_dir, checksum) return os.path.exists(checksum_path) - def _get_uid_from_checksum(self, checksum): + def _get_uvid_from_checksum(self, checksum): """Get an existing entry which file matches checksum. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - first_uid = os.listdir(checksum_path)[0] - return first_uid + first_uvid = os.listdir(checksum_path)[0] + return (first_uvid[:36], first_uvid[37:]) def _create_checksum_dir(self, checksum): """Create directory that tracks files with this same checksum. @@ -97,24 +100,25 @@ class Optimizer(object): logging.debug('create dir %r' % checksum_path) os.mkdir(checksum_path) - def _add_checksum_entry(self, uid, checksum): - """Create a file in the checksum dir with the uid of the entry + def _add_checksum_entry(self, tree_id, version_id, checksum): + """Create a file in the checksum dir with the tree_id and version_id of the entry """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - logging.debug('touch %r' % os.path.join(checksum_path, uid)) - open(os.path.join(checksum_path, uid), 'w').close() + fname = os.path.join(checksum_path, "%s-%s" % (tree_id,version_id)) + logging.debug('touch %r' % fname) + open(fname, 'w').close() - def _already_linked(self, uid, checksum): + def _already_linked(self, tree_id, version_id, checksum): """Check if this entry's file is already a hard link to the checksums dir. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) + return os.path.exists(os.path.join(checksum_path, "%s-%s" % (tree_id,version_id))) def _process_entry_cb(self): """Process one item in the checksums queue by calculating its checksum, @@ -125,30 +129,30 @@ class Optimizer(object): queue_path = layoutmanager.get_instance().get_queue_path() queue = os.listdir(queue_path) if queue: - uid = queue[0] - logging.debug('_process_entry_cb processing %r' % uid) + (tree_id,version_id) = queue[0][:36], queue[0][37:] + logging.debug('_process_entry_cb processing (%r,%r)' % (tree_id,version_id)) - file_in_entry_path = self._file_store.get_file_path(uid) + file_in_entry_path = self._file_store.get_file_path(tree_id,version_id) if not os.path.exists(file_in_entry_path): - logging.info('non-existent entry in queue: %r' % uid) + logging.info('non-existent entry in queue: (%r,%r)' % (tree_id,version_id)) else: checksum = self._calculate_md5sum(file_in_entry_path) - self._metadata_store.set_property(uid, 'checksum', checksum) + self._metadata_store.set_property(tree_id, version_id, 'checksum', checksum) if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - existing_entry_uid = \ - self._get_uid_from_checksum(checksum) + if not self._already_linked(tree_id, version_id, checksum): + existing_entry_uvid = \ + self._get_uvid_from_checksum(checksum) - self._file_store.hard_link_entry(uid, - existing_entry_uid) + self._file_store.hard_link_entry(tree_id, version_id, + *existing_entry_uvid) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(tree_id, version_id, checksum) else: self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(tree_id, version_id, checksum) - os.remove(os.path.join(queue_path, uid)) + os.remove(os.path.join(queue_path, "%s-%s" % (tree_id,version_id))) if len(queue) <= 1: self._enqueue_checksum_id = None |