diff options
author | Sascha Silbe <sascha@silbe.org> | 2009-06-22 00:30:00 (GMT) |
---|---|---|
committer | Sascha Silbe <sascha@silbe.org> | 2009-06-22 00:30:00 (GMT) |
commit | 2ae002dfe45d59ad1ac3afe955f9d262461f8d6d (patch) | |
tree | 4925557ed385d12416423b04b558ac667388766a | |
parent | 6b679e122c2a45e29b61287eef3c083822535f49 (diff) |
prototype take 1: add version_id / vid everywhere
-rw-r--r-- | src/carquinyol/__init__.py | 7 | ||||
-rw-r--r-- | src/carquinyol/datastore.py | 209 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 30 | ||||
-rw-r--r-- | src/carquinyol/indexstore.py | 52 | ||||
-rw-r--r-- | src/carquinyol/layoutmanager.py | 12 | ||||
-rw-r--r-- | src/carquinyol/metadatastore.py | 21 | ||||
-rw-r--r-- | src/carquinyol/migration.py | 70 | ||||
-rw-r--r-- | src/carquinyol/optimizer.py | 61 |
8 files changed, 275 insertions, 187 deletions
diff --git a/src/carquinyol/__init__.py b/src/carquinyol/__init__.py index 8b13789..d53da54 100644 --- a/src/carquinyol/__init__.py +++ b/src/carquinyol/__init__.py @@ -1 +1,8 @@ +import logging +import decorator + +@decorator.decorator +def trace(f, *args, **kwargs) : + logging.debug("%s(%s)" % (f.__name__, ", ".join([repr(a) for a in args]+['%s=%r' % (k,v) for (k,v) in kwargs.items()]))), + return f(*args, **kwargs) diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py index a118e03..8a04ca9 100644 --- a/src/carquinyol/datastore.py +++ b/src/carquinyol/datastore.py @@ -26,6 +26,7 @@ import gobject from sugar import mime +from carquinyol import trace from carquinyol import layoutmanager from carquinyol import migration from carquinyol.layoutmanager import MAX_QUERY_LIMIT @@ -56,7 +57,12 @@ class DataStore(dbus.service.Object): layout_manager = layoutmanager.get_instance() if layout_manager.get_version() == 0: migration.migrate_from_0() - layout_manager.set_version(1) + layout_manager.set_version(2) + layout_manager.index_updated = False + + if layout_manager.get_version() == 1: + migration.migrate_from_1() + layout_manager.set_version(2) layout_manager.index_updated = False self._metadata_store = MetadataStore() @@ -80,120 +86,129 @@ class DataStore(dbus.service.Object): self._optimizer = Optimizer(self._file_store, self._metadata_store) def _rebuild_index(self): - uids = layoutmanager.get_instance().find_all() - logging.debug('Going to update the index with uids %r' % uids) - gobject.idle_add(lambda: self.__rebuild_index_cb(uids), + uvids = layoutmanager.get_instance().find_all() + logging.debug('Going to update the index with uvids %r' % uvids) + gobject.idle_add(lambda: self.__rebuild_index_cb(uvids), priority=gobject.PRIORITY_LOW) - def __rebuild_index_cb(self, uids): - if uids: - uid = uids.pop() + def __rebuild_index_cb(self, uvids): + if uvids: + (uid,vid) = uvids.pop() - logging.debug('Updating entry %r in index. %d to go.' % \ - (uid, len(uids))) + logging.debug('Updating entry (%r,%r) in index. %d to go.' % \ + (uid, vid, len(uvids))) - if not self._index_store.contains(uid): + if not self._index_store.contains(uid,vid): try: - props = self._metadata_store.retrieve(uid) - self._index_store.store(uid, props) + props = self._metadata_store.retrieve(uid,vid) + self._index_store.store(uid, vid, props) except Exception: - logging.error('Error processing %r\n%s.' \ - % (uid, traceback.format_exc())) + logging.error('Error processing (%r,%r)\n%s.' \ + % (uid, vid, traceback.format_exc())) - if not uids: + if not uvids: logging.debug('Finished updating index.') layoutmanager.get_instance().index_updated = True return False else: return True - def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \ - (async_cb, async_err_cb, uid, exc)) + @trace + def _create_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None): if exc is not None: async_err_cb(exc) return - self.Created(uid) - self._optimizer.optimize(uid) - logger.debug("created %s" % uid) - async_cb(uid) + self.Created(uid, vid) + self._optimizer.optimize(uid, vid) + logger.debug("created (%s,%s)" % (uid,vid)) + async_cb(uid,vid) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}sb', - out_signature='s', + out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def create(self, props, file_path, transfer_ownership, async_cb, async_err_cb): uid = str(uuid.uuid4()) - logging.debug('datastore.create %r' % uid) + vid = str(uuid.uuid4()) # use fake for now + logging.debug('datastore.create %r %r' % (uid, vid)) if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, + self._metadata_store.store(uid, vid, props) + self._index_store.store(uid, vid, props) + self._file_store.store(uid, vid, file_path, transfer_ownership, lambda *args: self._create_completion_cb(async_cb, async_err_cb, uid, + vid, *args)) + return (uid, vid) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Created(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss") + def Created(self, uid, vid): pass - def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): - logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \ - (async_cb, async_err_cb, exc)) + @trace + def _update_completion_cb(self, async_cb, async_err_cb, uid, vid, exc=None): if exc is not None: async_err_cb(exc) return - self.Updated(uid) - self._optimizer.optimize(uid) - logger.debug("updated %s" % uid) - async_cb() + self.Updated(uid,vid) + self._optimizer.optimize(uid,vid) + logger.debug("updated %s %s" % (uid, vid)) + async_cb(uid, vid) @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='sa{sv}sb', - out_signature='', + in_signature='ssa{sv}sb', + out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) - def update(self, uid, props, file_path, transfer_ownership, + @trace + def update(self, uid, vid, props, file_path, transfer_ownership, async_cb, async_err_cb): - logging.debug('datastore.update %r' % uid) if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) - self._metadata_store.store(uid, props) - self._index_store.store(uid, props) + # TODO: create branch if required (inside some abstraction layer) + if file_path : + # only for data updates + vid = str(uuid.uuid4()) # use fake for now - if os.path.exists(self._file_store.get_file_path(uid)) and \ - (not file_path or os.path.exists(file_path)): - self._optimizer.remove(uid) - self._file_store.store(uid, file_path, transfer_ownership, + self._metadata_store.store(uid, vid, props) + self._index_store.store(uid, vid, props) + +# if os.path.exists(self._file_store.get_file_path(uid, vid)) and \ +# (not file_path or os.path.exists(file_path)): +# self._optimizer.remove(uid, vid) + self._file_store.store(uid, vid, file_path, transfer_ownership, lambda *args: self._update_completion_cb(async_cb, async_err_cb, uid, + vid, *args)) + return (uid, vid) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Updated(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss") + def Updated(self, uid, vid): pass @dbus.service.method(DS_DBUS_INTERFACE, in_signature='a{sv}as', out_signature='aa{sv}u') + @trace def find(self, query, properties): - logging.debug('datastore.find %r' % query) t = time.time() + uvids = None if layoutmanager.get_instance().index_updated: try: - uids, count = self._index_store.find(query) + uvids, count = self._index_store.find(query) except Exception: logging.error('Failed to query index, will rebuild\n%s' \ % traceback.format_exc()) @@ -203,45 +218,56 @@ class DataStore(dbus.service.Object): self._index_store.open_index() self._rebuild_index() - if not layoutmanager.get_instance().index_updated: + if uvids is None : logging.warning('Index updating, returning all entries') - uids = layoutmanager.get_instance().find_all() - count = len(uids) + uvids = layoutmanager.get_instance().find_all() + if not query.get('all_versions', False) : + # only return latest version for each entry + uids_vtime = {} + for (uid, vid) in uvids : + uids_vtime.setdefault(uid, []).append((vid, self._metadata_store.retrieve(uid, vid, 'timestamp'))) + + uvids = [(uid, sorted(candidates, key=lambda e: e[1], reverse=True)[0][0]) + for (uid, candidates) in uids_vtime.items()] + + count = len(uvids) offset = query.get('offset', 0) limit = query.get('limit', MAX_QUERY_LIMIT) - uids = uids[offset:offset + limit] + uvids = uvids[offset:offset + limit] + logger.debug('uvids=%r' % (uvids,)) entries = [] - for uid in uids: - metadata = self._metadata_store.retrieve(uid, properties) + for (uid,vid) in uvids: + metadata = self._metadata_store.retrieve(uid, vid, properties) entries.append(metadata) logger.debug('find(): %r' % (time.time() - t)) + logger.debug('count=%r, entries=%r' % (count, entries)) return entries, count @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='s', sender_keyword='sender') - def get_filename(self, uid, sender=None): - logging.debug('datastore.get_filename %r' % uid) + @trace + def get_filename(self, uid, vid, sender=None): user_id = dbus.Bus().get_unix_user(sender) - extension = self._get_extension(uid) - return self._file_store.retrieve(uid, user_id, extension) + extension = self._get_extension(uid,vid) + return self._file_store.retrieve(uid, vid, user_id, extension) - def _get_extension(self, uid): - mime_type = self._metadata_store.get_property(uid, 'mime_type') + def _get_extension(self, uid, vid): + mime_type = self._metadata_store.get_property(uid, vid, 'mime_type') if mime_type is None or not mime_type: return '' return mime.get_primary_extension(mime_type) @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='a{sv}') - def get_properties(self, uid): - logging.debug('datastore.get_properties %r' % uid) - metadata = self._metadata_store.retrieve(uid) + @trace + def get_properties(self, uid, vid): + metadata = self._metadata_store.retrieve(uid,vid) return metadata @dbus.service.method(DS_DBUS_INTERFACE, @@ -259,23 +285,24 @@ class DataStore(dbus.service.Object): return [] @dbus.service.method(DS_DBUS_INTERFACE, - in_signature='s', + in_signature='ss', out_signature='') - def delete(self, uid): - self._optimizer.remove(uid) + def delete(self, uid, vid): + # TODO: vid=None/'' => remove all versions + self._optimizer.remove(uid, vid) - self._index_store.delete(uid) - self._file_store.delete(uid) - self._metadata_store.delete(uid) + self._index_store.delete(uid, vid) + self._file_store.delete(uid, vid) + self._metadata_store.delete(uid, vid) - entry_path = layoutmanager.get_instance().get_entry_path(uid) + entry_path = layoutmanager.get_instance().get_entry_path(uid, vid) os.removedirs(entry_path) - self.Deleted(uid) - logger.debug("deleted %s" % uid) + self.Deleted(uid, vid) + logger.debug("deleted (%r,%r)" % (uid,vid)) - @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") - def Deleted(self, uid): + @dbus.service.signal(DS_DBUS_INTERFACE, signature="ss") + def Deleted(self, uid, vid): pass def stop(self): @@ -287,29 +314,3 @@ class DataStore(dbus.service.Object): def Stopped(self): pass - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="sa{sv}", - out_signature='s') - def mount(self, uri, options=None): - return '' - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="", - out_signature="aa{sv}") - def mounts(self): - return [{'id': 1}] - - @dbus.service.method(DS_DBUS_INTERFACE, - in_signature="s", - out_signature="") - def unmount(self, mountpoint_id): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Mounted(self, descriptior): - pass - - @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") - def Unmounted(self, descriptor): - pass - diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py index f88c531..7a4591c 100644 --- a/src/carquinyol/filestore.py +++ b/src/carquinyol/filestore.py @@ -29,11 +29,11 @@ class FileStore(object): # TODO: add protection against store and retrieve operations on entries # that are being processed async. - def store(self, uid, file_path, transfer_ownership, completion_cb): + def store(self, uid, vid, file_path, transfer_ownership, completion_cb): """Store a file for a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) if not os.path.exists(dir_path): os.makedirs(dir_path) @@ -75,15 +75,15 @@ class FileStore(object): async_copy = AsyncCopy(file_path, destination_path, completion_cb) async_copy.start() - def retrieve(self, uid, user_id, extension): + def retrieve(self, uid, vid, user_id, extension): """Place the file associated to a given entry into a directory where the user can read it. The caller is reponsible for deleting this file. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) file_path = os.path.join(dir_path, 'data') if not os.path.exists(file_path): - logging.debug('Entry %r doesnt have any file' % uid) + logging.debug('Entry (%r,%r) doesnt have any file' % (uid,vid)) return '' use_instance_dir = os.path.exists('/etc/olpc-security') and \ @@ -105,19 +105,19 @@ class FileStore(object): elif extension: extension = '.' + extension - destination_path = os.path.join(destination_dir, uid + extension) + destination_path = os.path.join(destination_dir, "%s-%s%s" % (uid, vid, extension)) attempt = 1 while os.path.exists(destination_path): if attempt > 10: - fd_, destination_path = tempfile.mkstemp(prefix=uid, + fd_, destination_path = tempfile.mkstemp(prefix="%s-%s" % (uid,vid), suffix=extension, dir=destination_dir) del fd_ os.unlink(destination_path) break else: - file_name = '%s_%s%s' % (uid, attempt, extension) + file_name = '%s-%s_%s%s' % (uid, vid, attempt, extension) destination_path = os.path.join(destination_dir, file_name) attempt += 1 @@ -141,25 +141,25 @@ class FileStore(object): return destination_path - def get_file_path(self, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def get_file_path(self, uid, vid): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) return os.path.join(dir_path, 'data') - def delete(self, uid): + def delete(self, uid, vid): """Remove the file associated to a given entry. """ - dir_path = layoutmanager.get_instance().get_entry_path(uid) + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) file_path = os.path.join(dir_path, 'data') if os.path.exists(file_path): os.remove(file_path) - def hard_link_entry(self, new_uid, existing_uid): + def hard_link_entry(self, new_uid, new_vid, existing_uid, existing_vid): existing_file = os.path.join( - layoutmanager.get_instance().get_entry_path(existing_uid), + layoutmanager.get_instance().get_entry_path(existing_uid, existing_vid), 'data') new_file = os.path.join( - layoutmanager.get_instance().get_entry_path(new_uid), + layoutmanager.get_instance().get_entry_path(new_uid, new_vid), 'data') logging.debug('removing %r' % new_file) diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py index 62aebb4..a814f7a 100644 --- a/src/carquinyol/indexstore.py +++ b/src/carquinyol/indexstore.py @@ -26,8 +26,10 @@ from carquinyol.layoutmanager import MAX_QUERY_LIMIT _VALUE_UID = 0 _VALUE_TIMESTAMP = 1 +_VALUE_VID = 2 _PREFIX_UID = 'Q' +_PREFIX_VID = 'V' _PREFIX_ACTIVITY = 'A' _PREFIX_ACTIVITY_ID = 'I' _PREFIX_MIME_TYPE = 'M' @@ -66,17 +68,19 @@ class IndexStore(object): for f in os.listdir(index_path): os.remove(os.path.join(index_path, f)) - def contains(self, uid): - postings = self._database.postlist(_PREFIX_UID + uid) + def contains(self, uid, vid): + postings = self._database.postlist(_PREFIX_UID + uid + _PREFIX_VID + vid) try: postlist_item = postings.next() except StopIteration: return False return True - def store(self, uid, properties): + def store(self, uid, vid, properties): document = Document() + document.add_term("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid)) document.add_term(_PREFIX_UID + uid) + document.add_term(_PREFIX_VID + vid) document.add_term(_PREFIX_ACTIVITY + properties.get('activity', '')) document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', '')) document.add_term(_PREFIX_ACTIVITY_ID + @@ -84,6 +88,7 @@ class IndexStore(object): document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0))) document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_VID, vid) document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp'])) term_generator = xapian.TermGenerator() @@ -103,10 +108,10 @@ class IndexStore(object): term_generator.index_text_without_positions( self._extract_text(properties), 1, '') - if not self.contains(uid): + if not self.contains(uid, vid): self._database.add_document(document) else: - self._database.replace_document(_PREFIX_UID + uid, document) + self._database.replace_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid), document) self._flush() def _extract_text(self, properties): @@ -124,10 +129,12 @@ class IndexStore(object): def find(self, query): enquire = Enquire(self._database) - enquire.set_query(self._parse_query(query)) - offset = query.get('offset', 0) - limit = query.get('limit', MAX_QUERY_LIMIT) + offset = query.pop('offset', 0) + limit = query.pop('limit', MAX_QUERY_LIMIT) + all_versions = query.pop('all_versions', False) + + enquire.set_query(self._parse_query(query)) # This will assure that the results count is exact. check_at_least = offset + limit + 1 @@ -137,11 +144,25 @@ class IndexStore(object): query_result = enquire.get_mset(offset, limit, check_at_least) total_count = query_result.get_matches_estimated() - uids = [] + uvids = [] + timestamps = [] for hit in query_result: - uids.append(hit.document.get_value(_VALUE_UID)) + uvids.append((hit.document.get_value(_VALUE_UID), hit.document.get_value(_VALUE_VID))) - return (uids, total_count) + if not all_versions : + # only return latest version for each entry + # result set is already sorted by time so we only need to take the first entry for each uid + uids_vid = {} + uvids_new = [] + for (uid, vid) in uvids : + if uid not in uids_vid : + uids_vid[uid] = vid + uvids_new.append((uid,vid)) + + uvids = uvids_new + + # TODO: total_count will be totally off if all_versions is not set + return (uvids, total_count) def _parse_query(self, query_dict): logging.debug('_parse_query %r' % query_dict) @@ -174,10 +195,15 @@ class IndexStore(object): query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end) queries.append(query) + # TODO: refactor / simplify uid = query_dict.pop('uid', None) if uid is not None: queries.append(Query(_PREFIX_UID + uid)) + vid = query_dict.pop('vid', None) + if vid is not None: + queries.append(Query(_PREFIX_VID + vid)) + activity = query_dict.pop('activity', None) if activity is not None: queries.append(Query(_PREFIX_ACTIVITY + activity)) @@ -207,8 +233,8 @@ class IndexStore(object): return Query(Query.OP_AND, queries) - def delete(self, uid): - self._database.delete_document(_PREFIX_UID + uid) + def delete(self, uid, vid): + self._database.delete_document("%s%s%s%s" % (_PREFIX_UID, uid, _PREFIX_VID, vid)) def get_activities(self): activities = [] diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py index dc3fde6..1a75052 100644 --- a/src/carquinyol/layoutmanager.py +++ b/src/carquinyol/layoutmanager.py @@ -53,9 +53,9 @@ class LayoutManager(object): version_path = os.path.join(self._root_path, 'version') open(version_path, 'w').write(str(version)) - def get_entry_path(self, uid): + def get_entry_path(self, uid, vid): # os.path.join() is just too slow - return '%s/%s/%s' % (self._root_path, uid[:2], uid) + return '%s/%s/%s-%s' % (self._root_path, uid[:2], uid, vid) def get_root_path(self): return self._root_path @@ -85,13 +85,13 @@ class LayoutManager(object): index_updated = property(_is_index_updated, _set_index_updated) def find_all(self): - uids = [] + uvids = [] for f in os.listdir(self._root_path): if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: for g in os.listdir(os.path.join(self._root_path, f)): - if len(g) == 36: - uids.append(g) - return uids + if len(g) == 73: + uvids.append((g[:36], g[37:])) + return uvids _instance = None def get_instance(): diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py index 50981f3..b5bfeb1 100644 --- a/src/carquinyol/metadatastore.py +++ b/src/carquinyol/metadatastore.py @@ -6,8 +6,8 @@ from carquinyol import metadatareader MAX_SIZE = 256 class MetadataStore(object): - def store(self, uid, metadata): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def store(self, uid, vid, metadata): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) if not os.path.exists(dir_path): os.makedirs(dir_path) @@ -19,6 +19,7 @@ class MetadataStore(object): os.remove(os.path.join(metadata_path, key)) metadata['uid'] = uid + metadata['vid'] = vid for key, value in metadata.items(): # Hack to support activities that still pass properties named as for @@ -36,19 +37,19 @@ class MetadataStore(object): finally: f.close() - def retrieve(self, uid, properties=None): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def retrieve(self, uid, vid, properties=None): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) return metadatareader.retrieve(dir_path, properties) - def delete(self, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def delete(self, uid, vid): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) metadata_path = os.path.join(dir_path, 'metadata') for key in os.listdir(metadata_path): os.remove(os.path.join(metadata_path, key)) os.rmdir(metadata_path) - def get_property(self, uid, key): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def get_property(self, uid, vid, key): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) metadata_path = os.path.join(dir_path, 'metadata') property_path = os.path.join(metadata_path, key) if os.path.exists(property_path): @@ -56,8 +57,8 @@ class MetadataStore(object): else: return None - def set_property(self, uid, key, value): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + def set_property(self, uid, vid, key, value): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) metadata_path = os.path.join(dir_path, 'metadata') property_path = os.path.join(metadata_path, key) open(property_path, 'w').write(value) diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py index 228db2a..0722e43 100644 --- a/src/carquinyol/migration.py +++ b/src/carquinyol/migration.py @@ -22,6 +22,7 @@ import logging import shutil import time import traceback +import uuid import cjson @@ -30,7 +31,7 @@ from carquinyol import layoutmanager DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' def migrate_from_0(): - logging.info('Migrating datastore from version 0 to version 1') + logging.info('Migrating datastore from version 0 to version 2') root_path = layoutmanager.get_instance().get_root_path() old_root_path = os.path.join(root_path, 'store') @@ -43,10 +44,11 @@ def migrate_from_0(): continue logging.debug('Migrating entry %r' % uid) + vid = str(uuid.uuid4()) try: - _migrate_metadata(root_path, old_root_path, uid) - _migrate_file(root_path, old_root_path, uid) - _migrate_preview(root_path, old_root_path, uid) + _migrate_metadata_0(root_path, old_root_path, uid, vid) + _migrate_file_0(root_path, old_root_path, uid, vid) + _migrate_preview_0(root_path, old_root_path, uid, vid) except Exception: logging.error('Error while migrating entry %r: %s\n' % \ (uid, traceback.format_exc())) @@ -57,8 +59,8 @@ def migrate_from_0(): logging.info('Migration finished') -def _migrate_metadata(root_path, old_root_path, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) +def _migrate_metadata_0(root_path, old_root_path, uid, vid): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) metadata_path = os.path.join(dir_path, 'metadata') os.makedirs(metadata_path) @@ -68,6 +70,8 @@ def _migrate_metadata(root_path, old_root_path, uid): if 'uid' not in metadata: metadata['uid'] = uid + metadata['vid'] = vid + if 'timestamp' not in metadata and 'mtime' in metadata: metadata['timestamp'] = \ time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT)) @@ -88,15 +92,61 @@ def _migrate_metadata(root_path, old_root_path, uid): 'Error while migrating property %s of entry %s: %s\n' % \ (key, uid, traceback.format_exc())) -def _migrate_file(root_path, old_root_path, uid): +def _migrate_file_0(root_path, old_root_path, uid, vid): if os.path.exists(os.path.join(old_root_path, uid)): - dir_path = layoutmanager.get_instance().get_entry_path(uid) + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) os.rename(os.path.join(old_root_path, uid), os.path.join(dir_path, 'data')) -def _migrate_preview(root_path, old_root_path, uid): - dir_path = layoutmanager.get_instance().get_entry_path(uid) +def _migrate_preview_0(root_path, old_root_path, uid, vid): + dir_path = layoutmanager.get_instance().get_entry_path(uid, vid) metadata_path = os.path.join(dir_path, 'metadata') os.rename(os.path.join(old_root_path, 'preview', uid), os.path.join(metadata_path, 'preview')) + +def migrate_from_1(): + logging.info('Migrating datastore from version 1 to version 2') + + root_path = layoutmanager.get_instance().get_root_path() + checksum_path = os.path.join(root_path, "checksums") + + vids = {} + for hash02 in os.listdir(root_path): + if len(hash02) != 2 : + continue + + for uid in os.listdir(os.path.join(root_path, hash02)) : + if (len(uid) != 36) : + continue + + logging.debug('Migrating entry %r' % uid) + + vid = str(uuid.uuid4()) + vids[uid] = vid + try: + new_path = layoutmanager.get_instance().get_entry_path(uid, vid) + os.rename(os.path.join(root_path, hash02, uid), + new_path) + file(os.path.join(new_path, "metadata", "vid"), "w").write(vid) + + except Exception: + logging.error('Error while migrating entry %r: %s\n' % \ + (uid, traceback.format_exc())) + + for checksum in os.listdir(checksum_path) : + entries_path = os.path.join(checksum_path, checksum) + for uid in os.listdir(entries_path) : + if len(uid) != 36 : + continue + + try : + os.rename(os.path.join(entries_path, uid), + os.path.join(entries_path, "%s-%s" % (uid,vids[uid]))) + + except Exception: + logging.error('Error while migrating checksum entry %r / %r: %s\n' % \ + (checksum, uid, traceback.format_exc())) + + logging.info('Migration finished') + diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py index f8a2e3e..c4a0681 100644 --- a/src/carquinyol/optimizer.py +++ b/src/carquinyol/optimizer.py @@ -18,6 +18,7 @@ import os import errno import logging import subprocess +import uuid import gobject @@ -31,33 +32,34 @@ class Optimizer(object): self._metadata_store = metadata_store self._enqueue_checksum_id = None - def optimize(self, uid): + def optimize(self, uid, vid): """Add an entry to a queue of entries to be checked for duplicates. """ - if not os.path.exists(self._file_store.get_file_path(uid)): + if not os.path.exists(self._file_store.get_file_path(uid, vid)): return queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('optimize %r' % os.path.join(queue_path, uid)) + fname = os.path.join(queue_path, "%s-%s" % (uid, vid)) + open(fname, 'w').close() + logging.debug('optimize %r' % fname) if self._enqueue_checksum_id is None: self._enqueue_checksum_id = \ gobject.idle_add(self._process_entry_cb, priority=gobject.PRIORITY_LOW) - def remove(self, uid): + def remove(self, uid, vid): """Remove any structures left from space optimization """ - checksum = self._metadata_store.get_property(uid, 'checksum') + checksum = self._metadata_store.get_property(uid, vid, 'checksum') if checksum is None: return checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - checksum_entry_path = os.path.join(checksum_path, uid) + checksum_entry_path = os.path.join(checksum_path, "%s-%s" % (uid,vid)) if os.path.exists(checksum_entry_path): logging.debug('remove %r' % checksum_entry_path) @@ -79,14 +81,14 @@ class Optimizer(object): checksum_path = os.path.join(checksums_dir, checksum) return os.path.exists(checksum_path) - def _get_uid_from_checksum(self, checksum): + def _get_uvid_from_checksum(self, checksum): """Get an existing entry which file matches checksum. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - first_uid = os.listdir(checksum_path)[0] - return first_uid + first_uvid = os.listdir(checksum_path)[0] + return (first_uvid[:36], first_uvid[37:]) def _create_checksum_dir(self, checksum): """Create directory that tracks files with this same checksum. @@ -97,24 +99,25 @@ class Optimizer(object): logging.debug('create dir %r' % checksum_path) os.mkdir(checksum_path) - def _add_checksum_entry(self, uid, checksum): - """Create a file in the checksum dir with the uid of the entry + def _add_checksum_entry(self, uid, vid, checksum): + """Create a file in the checksum dir with the uid and vid of the entry """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - logging.debug('touch %r' % os.path.join(checksum_path, uid)) - open(os.path.join(checksum_path, uid), 'w').close() + fname = os.path.join(checksum_path, "%s-%s" % (uid,vid)) + logging.debug('touch %r' % fname) + open(fname, 'w').close() - def _already_linked(self, uid, checksum): + def _already_linked(self, uid, vid, checksum): """Check if this entry's file is already a hard link to the checksums dir. """ checksums_dir = layoutmanager.get_instance().get_checksums_dir() checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) + return os.path.exists(os.path.join(checksum_path, "%s-%s" % (uid,vid))) def _process_entry_cb(self): """Process one item in the checksums queue by calculating its checksum, @@ -125,30 +128,30 @@ class Optimizer(object): queue_path = layoutmanager.get_instance().get_queue_path() queue = os.listdir(queue_path) if queue: - uid = queue[0] - logging.debug('_process_entry_cb processing %r' % uid) + (uid,vid) = queue[0][:36], queue[0][37:] + logging.debug('_process_entry_cb processing (%r,%r)' % (uid,vid)) - file_in_entry_path = self._file_store.get_file_path(uid) + file_in_entry_path = self._file_store.get_file_path(uid,vid) if not os.path.exists(file_in_entry_path): - logging.info('non-existent entry in queue: %r' % uid) + logging.info('non-existent entry in queue: (%r,%r)' % (uid,vid)) else: checksum = self._calculate_md5sum(file_in_entry_path) - self._metadata_store.set_property(uid, 'checksum', checksum) + self._metadata_store.set_property(uid, vid, 'checksum', checksum) if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - existing_entry_uid = \ - self._get_uid_from_checksum(checksum) + if not self._already_linked(uid, vid, checksum): + existing_entry_uvid = \ + self._get_uvid_from_checksum(checksum) - self._file_store.hard_link_entry(uid, - existing_entry_uid) + self._file_store.hard_link_entry(uid, vid, + *existing_entry_uvid) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(uid, vid, checksum) else: self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) + self._add_checksum_entry(uid, vid, checksum) - os.remove(os.path.join(queue_path, uid)) + os.remove(os.path.join(queue_path, "%s-%s" % (uid,vid))) if len(queue) <= 1: self._enqueue_checksum_id = None |