diff options
author | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-09-30 16:30:17 (GMT) |
---|---|---|
committer | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-09-30 16:30:17 (GMT) |
commit | 2f325039969506164a2c79f6e73ea7ec56ee71c1 (patch) | |
tree | f04d6fed1a5c93e9062af0da7d7926dc96106426 | |
parent | 8e4d5a80c16beff4aff836a0370a569ac739fe91 (diff) |
Refactor the hardlink optimization out from filestore and store the checksum as a metadata property
-rw-r--r-- | src/olpc/datastore/Makefile.am | 3 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 11 | ||||
-rw-r--r-- | src/olpc/datastore/filestore.py | 166 | ||||
-rw-r--r-- | src/olpc/datastore/metadatastore.py | 21 | ||||
-rw-r--r-- | src/olpc/datastore/migration.py | 2 | ||||
-rw-r--r-- | src/olpc/datastore/optimizer.py | 149 | ||||
-rw-r--r-- | tests/test_sugar.py | 2 |
7 files changed, 191 insertions, 163 deletions
diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am index 6d6c2fe..062e9b0 100644 --- a/src/olpc/datastore/Makefile.am +++ b/src/olpc/datastore/Makefile.am @@ -6,7 +6,8 @@ datastore_PYTHON = \ indexstore.py \ layoutmanager.py \ metadatastore.py \ - migration.py + migration.py \ + optimizer.py AM_CPPFLAGS = \ $(WARN_CFLAGS) \ diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 60b29f5..a7a86d8 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -24,6 +24,7 @@ from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT from olpc.datastore.metadatastore import MetadataStore from olpc.datastore.indexstore import IndexStore from olpc.datastore.filestore import FileStore +from olpc.datastore.optimizer import Optimizer # the name used by the logger DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' @@ -66,6 +67,8 @@ class DataStore(dbus.service.Object): logging.debug('Index is not up-to-date, will update') self._rebuild_index() + self._optimizer = Optimizer(self._file_store, self._metadata_store) + def _rebuild_index(self): uids = layoutmanager.get_instance().find_all() logging.debug('Going to update the index with uids %r' % uids) @@ -98,6 +101,7 @@ class DataStore(dbus.service.Object): return self.Created(uid) + self._optimizer.optimize(uid) logger.debug("created %s" % uid) async_cb(uid) @@ -133,6 +137,7 @@ class DataStore(dbus.service.Object): return self.Updated(uid) + self._optimizer.optimize(uid) logger.debug("updated %s" % uid) async_cb() @@ -148,6 +153,10 @@ class DataStore(dbus.service.Object): self._metadata_store.store(uid, props) self._index_store.store(uid, props) + + if os.path.exists(self._file_store.get_file_path(uid)) and \ + (not file_path or os.path.exists(file_path)): + self._optimizer.remove(uid) self._file_store.store(uid, file_path, transfer_ownership, lambda *args: self._update_completion_cb(async_cb, async_err_cb, @@ -230,6 +239,8 @@ class DataStore(dbus.service.Object): in_signature='s', out_signature='') def delete(self, uid): + self._optimizer.remove(uid) + self._index_store.delete(uid) self._file_store.delete(uid) self._metadata_store.delete(uid) diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py index 2c90fe0..0640403 100644 --- a/src/olpc/datastore/filestore.py +++ b/src/olpc/datastore/filestore.py @@ -10,11 +10,8 @@ class FileStore(object): """Handle the storage of one file per entry. """ - def __init__(self): - self._enqueue_checksum_id = None - - # TODO: add protection against store and retrieve operations on entries - # that are being processed async. + # TODO: add protection against store and retrieve operations on entries + # that are being processed async. def store(self, uid, file_path, transfer_ownership, completion_cb): """Store a file for a given entry. @@ -33,17 +30,15 @@ class FileStore(object): logging.debug('FileStore moving from %r to %r' % \ (file_path, destination_path)) os.rename(file_path, destination_path) - self._enqueue_checksum(uid) completion_cb() except OSError, e: if e.errno == errno.EXDEV: - self._async_copy(uid, file_path, destination_path, - completion_cb) + self._async_copy(file_path, destination_path, + completion_cb) else: raise else: - self._async_copy(uid, file_path, destination_path, - completion_cb) + self._async_copy(file_path, destination_path, completion_cb) elif not file_path and os.path.exists(destination_path): os.remove(destination_path) completion_cb() @@ -51,157 +46,15 @@ class FileStore(object): logging.debug('FileStore: Nothing to do') completion_cb() - def _async_copy(self, uid, file_path, destination_path, completion_cb): + def _async_copy(self, file_path, destination_path, completion_cb): """Start copying a file asynchronously. """ logging.debug('FileStore copying from %r to %r' % \ (file_path, destination_path)) - async_copy = AsyncCopy(file_path, destination_path, - lambda exception: self._async_copy_completion_cb(uid, - completion_cb, - exception)) + async_copy = AsyncCopy(file_path, destination_path, completion_cb) async_copy.start() - def _async_copy_completion_cb(self, uid, completion_cb, exception): - """Callback called when an asynchronous copy has finished. - - """ - if exception is None: - self._enqueue_checksum(uid) - completion_cb(exception) - - def _enqueue_checksum(self, uid): - """Add an entry to a queue of entries to be checked for duplicates. - - """ - queue_path = layoutmanager.get_instance().get_queue_path() - open(os.path.join(queue_path, uid), 'w').close() - logging.debug('_enqueue_checksum %r' % os.path.join(queue_path, uid)) - if self._enqueue_checksum_id is None: - self._enqueue_checksum_id = \ - gobject.idle_add(self._compute_checksum_cb, - priority=gobject.PRIORITY_LOW) - - def _identical_file_already_exists(self, checksum): - """Check if we already have files with this checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(checksum_path) - - def _get_file_from_checksum(self, checksum): - """Get a file that matches checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - first_file_link = os.listdir(checksum_path)[0] - first_file = os.readlink(os.path.join(checksum_path, first_file_link)) - return first_file - - def _create_checksum_dir(self, checksum): - """Create directory that tracks files with this same checksum. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - logging.debug('create dir %r' % checksum_path) - os.mkdir(checksum_path) - - def _add_checksum_entry(self, uid, checksum): - """Create a symbolic link in the checksum dir to the file in the entry - dir and another one in the entry path to the checksum dir. - - """ - entry_path = layoutmanager.get_instance().get_entry_path(uid) - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - - logging.debug('symlink %r -> %r' % (os.path.join(checksum_path, uid), - os.path.join(entry_path, 'data'))) - os.symlink(os.path.join(entry_path, 'data'), - os.path.join(checksum_path, uid)) - - logging.debug('symlink %r -> %r' % \ - (os.path.join(entry_path, 'checksum'), checksum_path)) - os.symlink(checksum_path, os.path.join(entry_path, 'checksum')) - - def _remove_checksum_entry(self, uid): - """Remove links created in _add_checksum_entry() and the checksum dir - if empty. - - """ - entry_path = layoutmanager.get_instance().get_entry_path(uid) - checksum = os.readlink(os.path.join(entry_path, 'checksum')) - - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - - os.remove(os.path.join(checksum_path, uid)) - try: - os.rmdir(checksum_path) - except OSError, e: - if e.errno != errno.ENOTEMPTY: - raise - - os.remove(os.path.join(entry_path, 'checksum')) - - def _already_linked(self, uid, checksum): - """Check if this entry's file is already a hard link to the checksums - dir. - - """ - checksums_dir = layoutmanager.get_instance().get_checksums_dir() - checksum_path = os.path.join(checksums_dir, checksum) - return os.path.exists(os.path.join(checksum_path, uid)) - - def _compute_checksum_cb(self): - """Process one item in the checksums queue by calculating its checksum, - checking if there exist already an identical file, and in that case - substituting its file with a hard link to that pre-existing file. - - """ - queue_path = layoutmanager.get_instance().get_queue_path() - queue = os.listdir(queue_path) - if queue: - uid = queue[0] - logging.debug('_compute_checksum_cb processing %r' % uid) - entry_path = layoutmanager.get_instance().get_entry_path(uid) - file_in_entry_path = os.path.join(entry_path, 'data') - checksum = self._calculate_md5sum(file_in_entry_path) - - if self._identical_file_already_exists(checksum): - if not self._already_linked(uid, checksum): - logging.debug('delete %r' % file_in_entry_path) - os.remove(file_in_entry_path) - - existing_file = self._get_file_from_checksum(checksum) - logging.debug('link %r -> %r' % \ - (existing_file, file_in_entry_path)) - os.link(existing_file, file_in_entry_path) - - self._add_checksum_entry(uid, checksum) - else: - self._create_checksum_dir(checksum) - self._add_checksum_entry(uid, checksum) - - os.remove(os.path.join(queue_path, uid)) - - if len(queue) <= 1: - self._enqueue_checksum_id = None - return False - else: - return True - - def _calculate_md5sum(self, path): - """Calculate the md5 checksum of a given file. - - """ - in_, out = os.popen2(['md5sum', path]) - return out.read().split(' ', 1)[0] - def retrieve(self, uid, user_id): """Place the file associated to a given entry into a directory where the user can read it. The caller is reponsible for deleting this file. @@ -248,6 +101,10 @@ class FileStore(object): return destination_path + def get_file_path(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + return os.path.join(dir_path, 'data') + def delete(self, uid): """Remove the file associated to a given entry. @@ -255,7 +112,6 @@ class FileStore(object): dir_path = layoutmanager.get_instance().get_entry_path(uid) file_path = os.path.join(dir_path, 'data') if os.path.exists(file_path): - self._remove_checksum_entry(uid) os.remove(file_path) class AsyncCopy(object): diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py index d17f476..8d1e377 100644 --- a/src/olpc/datastore/metadatastore.py +++ b/src/olpc/datastore/metadatastore.py @@ -20,11 +20,7 @@ class MetadataStore(object): metadata['uid'] = uid for key, value in metadata.items(): - f = open(os.path.join(metadata_path, key), 'w+') - try: - f.write(str(value)) - finally: - f.close() + open(os.path.join(metadata_path, key), 'w').write(str(value)) def retrieve(self, uid, properties=None): dir_path = layoutmanager.get_instance().get_entry_path(uid) @@ -37,3 +33,18 @@ class MetadataStore(object): os.remove(os.path.join(metadata_path, key)) os.rmdir(metadata_path) + def get_property(self, uid, key): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + if os.path.exists(property_path): + return open(property_path, 'r').read() + else: + return None + + def set_property(self, uid, key, value): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + open(property_path, 'w').write(value) + diff --git a/src/olpc/datastore/migration.py b/src/olpc/datastore/migration.py index ec27526..da3873e 100644 --- a/src/olpc/datastore/migration.py +++ b/src/olpc/datastore/migration.py @@ -1,7 +1,5 @@ import os import logging -import traceback -import sys import shutil import cjson diff --git a/src/olpc/datastore/optimizer.py b/src/olpc/datastore/optimizer.py new file mode 100644 index 0000000..0eedda8 --- /dev/null +++ b/src/olpc/datastore/optimizer.py @@ -0,0 +1,149 @@ +import os +import errno +import logging + +import gobject + +from olpc.datastore import layoutmanager + +class Optimizer(object): + """Optimizes disk space usage by detecting duplicates and sharing storage. + + """ + def __init__(self, file_store, metadata_store): + self._file_store = file_store + self._metadata_store = metadata_store + self._enqueue_checksum_id = None + + def optimize(self, uid): + """Add an entry to a queue of entries to be checked for duplicates. + + """ + if not os.path.exists(self._file_store.get_file_path(uid)): + return + + queue_path = layoutmanager.get_instance().get_queue_path() + open(os.path.join(queue_path, uid), 'w').close() + logging.debug('optimize %r' % os.path.join(queue_path, uid)) + + if self._enqueue_checksum_id is None: + self._enqueue_checksum_id = \ + gobject.idle_add(self._process_entry_cb, + priority=gobject.PRIORITY_LOW) + + def remove(self, uid): + """Remove any structures left from space optimization + + """ + checksum = self._metadata_store.get_property(uid, 'checksum') + if checksum is None: + return + + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('remove %r' % os.path.join(checksum_path, uid)) + os.remove(os.path.join(checksum_path, uid)) + try: + os.rmdir(checksum_path) + logging.debug('removed %r' % checksum_path) + except OSError, e: + if e.errno != errno.ENOTEMPTY: + raise + + def _identical_file_already_exists(self, checksum): + """Check if we already have files with this checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(checksum_path) + + def _get_file_from_checksum(self, checksum): + """Get a file that matches checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + first_file_link = os.listdir(checksum_path)[0] + first_file = os.readlink(os.path.join(checksum_path, first_file_link)) + return first_file + + def _create_checksum_dir(self, checksum): + """Create directory that tracks files with this same checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + logging.debug('create dir %r' % checksum_path) + os.mkdir(checksum_path) + + def _add_checksum_entry(self, uid, checksum): + """Create a symbolic link in the checksum dir to the file in the entry + dir. + + """ + entry_path = layoutmanager.get_instance().get_entry_path(uid) + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('symlink %r -> %r' % (os.path.join(checksum_path, uid), + os.path.join(entry_path, 'data'))) + os.symlink(os.path.join(entry_path, 'data'), + os.path.join(checksum_path, uid)) + + def _already_linked(self, uid, checksum): + """Check if this entry's file is already a hard link to the checksums + dir. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(os.path.join(checksum_path, uid)) + + def _process_entry_cb(self): + """Process one item in the checksums queue by calculating its checksum, + checking if there exist already an identical file, and in that case + substituting its file with a hard link to that pre-existing file. + + """ + queue_path = layoutmanager.get_instance().get_queue_path() + queue = os.listdir(queue_path) + if queue: + uid = queue[0] + logging.debug('_process_entry_cb processing %r' % uid) + entry_path = layoutmanager.get_instance().get_entry_path(uid) + file_in_entry_path = os.path.join(entry_path, 'data') + checksum = self._calculate_md5sum(file_in_entry_path) + self._metadata_store.set_property(uid, 'checksum', checksum) + + if self._identical_file_already_exists(checksum): + if not self._already_linked(uid, checksum): + logging.debug('delete %r' % file_in_entry_path) + os.remove(file_in_entry_path) + + existing_file = self._get_file_from_checksum(checksum) + logging.debug('link %r -> %r' % \ + (existing_file, file_in_entry_path)) + os.link(existing_file, file_in_entry_path) + + self._add_checksum_entry(uid, checksum) + else: + self._create_checksum_dir(checksum) + self._add_checksum_entry(uid, checksum) + + os.remove(os.path.join(queue_path, uid)) + + if len(queue) <= 1: + self._enqueue_checksum_id = None + return False + else: + return True + + def _calculate_md5sum(self, path): + """Calculate the md5 checksum of a given file. + + """ + in_, out = os.popen2(['md5sum', path]) + return out.read().split(' ', 1)[0] + diff --git a/tests/test_sugar.py b/tests/test_sugar.py index ab4a897..1dbb607 100644 --- a/tests/test_sugar.py +++ b/tests/test_sugar.py @@ -108,12 +108,14 @@ class FunctionalityTest(CommonTest): props = self._data_store.get_properties(uid, byte_arrays=True) del props['uid'] del props['mountpoint'] + del props['checksum'] assert props == PROPS_WITHOUT_PREVIEW t = self.update(uid) props = self._data_store.get_properties(uid, byte_arrays=True) del props['uid'] del props['mountpoint'] + del props['checksum'] assert props == PROPS_WITH_PREVIEW file_name = self._data_store.get_filename(uid) |