Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2008-09-30 16:30:17 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2008-09-30 16:30:17 (GMT)
commit2f325039969506164a2c79f6e73ea7ec56ee71c1 (patch)
treef04d6fed1a5c93e9062af0da7d7926dc96106426
parent8e4d5a80c16beff4aff836a0370a569ac739fe91 (diff)
Refactor the hardlink optimization out from filestore and store the checksum as a metadata property
-rw-r--r--src/olpc/datastore/Makefile.am3
-rw-r--r--src/olpc/datastore/datastore.py11
-rw-r--r--src/olpc/datastore/filestore.py166
-rw-r--r--src/olpc/datastore/metadatastore.py21
-rw-r--r--src/olpc/datastore/migration.py2
-rw-r--r--src/olpc/datastore/optimizer.py149
-rw-r--r--tests/test_sugar.py2
7 files changed, 191 insertions, 163 deletions
diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am
index 6d6c2fe..062e9b0 100644
--- a/src/olpc/datastore/Makefile.am
+++ b/src/olpc/datastore/Makefile.am
@@ -6,7 +6,8 @@ datastore_PYTHON = \
indexstore.py \
layoutmanager.py \
metadatastore.py \
- migration.py
+ migration.py \
+ optimizer.py
AM_CPPFLAGS = \
$(WARN_CFLAGS) \
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index 60b29f5..a7a86d8 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -24,6 +24,7 @@ from olpc.datastore.layoutmanager import MAX_QUERY_LIMIT
from olpc.datastore.metadatastore import MetadataStore
from olpc.datastore.indexstore import IndexStore
from olpc.datastore.filestore import FileStore
+from olpc.datastore.optimizer import Optimizer
# the name used by the logger
DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
@@ -66,6 +67,8 @@ class DataStore(dbus.service.Object):
logging.debug('Index is not up-to-date, will update')
self._rebuild_index()
+ self._optimizer = Optimizer(self._file_store, self._metadata_store)
+
def _rebuild_index(self):
uids = layoutmanager.get_instance().find_all()
logging.debug('Going to update the index with uids %r' % uids)
@@ -98,6 +101,7 @@ class DataStore(dbus.service.Object):
return
self.Created(uid)
+ self._optimizer.optimize(uid)
logger.debug("created %s" % uid)
async_cb(uid)
@@ -133,6 +137,7 @@ class DataStore(dbus.service.Object):
return
self.Updated(uid)
+ self._optimizer.optimize(uid)
logger.debug("updated %s" % uid)
async_cb()
@@ -148,6 +153,10 @@ class DataStore(dbus.service.Object):
self._metadata_store.store(uid, props)
self._index_store.store(uid, props)
+
+ if os.path.exists(self._file_store.get_file_path(uid)) and \
+ (not file_path or os.path.exists(file_path)):
+ self._optimizer.remove(uid)
self._file_store.store(uid, file_path, transfer_ownership,
lambda *args: self._update_completion_cb(async_cb,
async_err_cb,
@@ -230,6 +239,8 @@ class DataStore(dbus.service.Object):
in_signature='s',
out_signature='')
def delete(self, uid):
+ self._optimizer.remove(uid)
+
self._index_store.delete(uid)
self._file_store.delete(uid)
self._metadata_store.delete(uid)
diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py
index 2c90fe0..0640403 100644
--- a/src/olpc/datastore/filestore.py
+++ b/src/olpc/datastore/filestore.py
@@ -10,11 +10,8 @@ class FileStore(object):
"""Handle the storage of one file per entry.
"""
- def __init__(self):
- self._enqueue_checksum_id = None
-
- # TODO: add protection against store and retrieve operations on entries
- # that are being processed async.
+ # TODO: add protection against store and retrieve operations on entries
+ # that are being processed async.
def store(self, uid, file_path, transfer_ownership, completion_cb):
"""Store a file for a given entry.
@@ -33,17 +30,15 @@ class FileStore(object):
logging.debug('FileStore moving from %r to %r' % \
(file_path, destination_path))
os.rename(file_path, destination_path)
- self._enqueue_checksum(uid)
completion_cb()
except OSError, e:
if e.errno == errno.EXDEV:
- self._async_copy(uid, file_path, destination_path,
- completion_cb)
+ self._async_copy(file_path, destination_path,
+ completion_cb)
else:
raise
else:
- self._async_copy(uid, file_path, destination_path,
- completion_cb)
+ self._async_copy(file_path, destination_path, completion_cb)
elif not file_path and os.path.exists(destination_path):
os.remove(destination_path)
completion_cb()
@@ -51,157 +46,15 @@ class FileStore(object):
logging.debug('FileStore: Nothing to do')
completion_cb()
- def _async_copy(self, uid, file_path, destination_path, completion_cb):
+ def _async_copy(self, file_path, destination_path, completion_cb):
"""Start copying a file asynchronously.
"""
logging.debug('FileStore copying from %r to %r' % \
(file_path, destination_path))
- async_copy = AsyncCopy(file_path, destination_path,
- lambda exception: self._async_copy_completion_cb(uid,
- completion_cb,
- exception))
+ async_copy = AsyncCopy(file_path, destination_path, completion_cb)
async_copy.start()
- def _async_copy_completion_cb(self, uid, completion_cb, exception):
- """Callback called when an asynchronous copy has finished.
-
- """
- if exception is None:
- self._enqueue_checksum(uid)
- completion_cb(exception)
-
- def _enqueue_checksum(self, uid):
- """Add an entry to a queue of entries to be checked for duplicates.
-
- """
- queue_path = layoutmanager.get_instance().get_queue_path()
- open(os.path.join(queue_path, uid), 'w').close()
- logging.debug('_enqueue_checksum %r' % os.path.join(queue_path, uid))
- if self._enqueue_checksum_id is None:
- self._enqueue_checksum_id = \
- gobject.idle_add(self._compute_checksum_cb,
- priority=gobject.PRIORITY_LOW)
-
- def _identical_file_already_exists(self, checksum):
- """Check if we already have files with this checksum.
-
- """
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(checksum_path)
-
- def _get_file_from_checksum(self, checksum):
- """Get a file that matches checksum.
-
- """
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
- first_file_link = os.listdir(checksum_path)[0]
- first_file = os.readlink(os.path.join(checksum_path, first_file_link))
- return first_file
-
- def _create_checksum_dir(self, checksum):
- """Create directory that tracks files with this same checksum.
-
- """
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
- logging.debug('create dir %r' % checksum_path)
- os.mkdir(checksum_path)
-
- def _add_checksum_entry(self, uid, checksum):
- """Create a symbolic link in the checksum dir to the file in the entry
- dir and another one in the entry path to the checksum dir.
-
- """
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
-
- logging.debug('symlink %r -> %r' % (os.path.join(checksum_path, uid),
- os.path.join(entry_path, 'data')))
- os.symlink(os.path.join(entry_path, 'data'),
- os.path.join(checksum_path, uid))
-
- logging.debug('symlink %r -> %r' % \
- (os.path.join(entry_path, 'checksum'), checksum_path))
- os.symlink(checksum_path, os.path.join(entry_path, 'checksum'))
-
- def _remove_checksum_entry(self, uid):
- """Remove links created in _add_checksum_entry() and the checksum dir
- if empty.
-
- """
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
- checksum = os.readlink(os.path.join(entry_path, 'checksum'))
-
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
-
- os.remove(os.path.join(checksum_path, uid))
- try:
- os.rmdir(checksum_path)
- except OSError, e:
- if e.errno != errno.ENOTEMPTY:
- raise
-
- os.remove(os.path.join(entry_path, 'checksum'))
-
- def _already_linked(self, uid, checksum):
- """Check if this entry's file is already a hard link to the checksums
- dir.
-
- """
- checksums_dir = layoutmanager.get_instance().get_checksums_dir()
- checksum_path = os.path.join(checksums_dir, checksum)
- return os.path.exists(os.path.join(checksum_path, uid))
-
- def _compute_checksum_cb(self):
- """Process one item in the checksums queue by calculating its checksum,
- checking if there exist already an identical file, and in that case
- substituting its file with a hard link to that pre-existing file.
-
- """
- queue_path = layoutmanager.get_instance().get_queue_path()
- queue = os.listdir(queue_path)
- if queue:
- uid = queue[0]
- logging.debug('_compute_checksum_cb processing %r' % uid)
- entry_path = layoutmanager.get_instance().get_entry_path(uid)
- file_in_entry_path = os.path.join(entry_path, 'data')
- checksum = self._calculate_md5sum(file_in_entry_path)
-
- if self._identical_file_already_exists(checksum):
- if not self._already_linked(uid, checksum):
- logging.debug('delete %r' % file_in_entry_path)
- os.remove(file_in_entry_path)
-
- existing_file = self._get_file_from_checksum(checksum)
- logging.debug('link %r -> %r' % \
- (existing_file, file_in_entry_path))
- os.link(existing_file, file_in_entry_path)
-
- self._add_checksum_entry(uid, checksum)
- else:
- self._create_checksum_dir(checksum)
- self._add_checksum_entry(uid, checksum)
-
- os.remove(os.path.join(queue_path, uid))
-
- if len(queue) <= 1:
- self._enqueue_checksum_id = None
- return False
- else:
- return True
-
- def _calculate_md5sum(self, path):
- """Calculate the md5 checksum of a given file.
-
- """
- in_, out = os.popen2(['md5sum', path])
- return out.read().split(' ', 1)[0]
-
def retrieve(self, uid, user_id):
"""Place the file associated to a given entry into a directory where the
user can read it. The caller is reponsible for deleting this file.
@@ -248,6 +101,10 @@ class FileStore(object):
return destination_path
+ def get_file_path(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ return os.path.join(dir_path, 'data')
+
def delete(self, uid):
"""Remove the file associated to a given entry.
@@ -255,7 +112,6 @@ class FileStore(object):
dir_path = layoutmanager.get_instance().get_entry_path(uid)
file_path = os.path.join(dir_path, 'data')
if os.path.exists(file_path):
- self._remove_checksum_entry(uid)
os.remove(file_path)
class AsyncCopy(object):
diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py
index d17f476..8d1e377 100644
--- a/src/olpc/datastore/metadatastore.py
+++ b/src/olpc/datastore/metadatastore.py
@@ -20,11 +20,7 @@ class MetadataStore(object):
metadata['uid'] = uid
for key, value in metadata.items():
- f = open(os.path.join(metadata_path, key), 'w+')
- try:
- f.write(str(value))
- finally:
- f.close()
+ open(os.path.join(metadata_path, key), 'w').write(str(value))
def retrieve(self, uid, properties=None):
dir_path = layoutmanager.get_instance().get_entry_path(uid)
@@ -37,3 +33,18 @@ class MetadataStore(object):
os.remove(os.path.join(metadata_path, key))
os.rmdir(metadata_path)
+ def get_property(self, uid, key):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ if os.path.exists(property_path):
+ return open(property_path, 'r').read()
+ else:
+ return None
+
+ def set_property(self, uid, key, value):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ open(property_path, 'w').write(value)
+
diff --git a/src/olpc/datastore/migration.py b/src/olpc/datastore/migration.py
index ec27526..da3873e 100644
--- a/src/olpc/datastore/migration.py
+++ b/src/olpc/datastore/migration.py
@@ -1,7 +1,5 @@
import os
import logging
-import traceback
-import sys
import shutil
import cjson
diff --git a/src/olpc/datastore/optimizer.py b/src/olpc/datastore/optimizer.py
new file mode 100644
index 0000000..0eedda8
--- /dev/null
+++ b/src/olpc/datastore/optimizer.py
@@ -0,0 +1,149 @@
+import os
+import errno
+import logging
+
+import gobject
+
+from olpc.datastore import layoutmanager
+
+class Optimizer(object):
+ """Optimizes disk space usage by detecting duplicates and sharing storage.
+
+ """
+ def __init__(self, file_store, metadata_store):
+ self._file_store = file_store
+ self._metadata_store = metadata_store
+ self._enqueue_checksum_id = None
+
+ def optimize(self, uid):
+ """Add an entry to a queue of entries to be checked for duplicates.
+
+ """
+ if not os.path.exists(self._file_store.get_file_path(uid)):
+ return
+
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ open(os.path.join(queue_path, uid), 'w').close()
+ logging.debug('optimize %r' % os.path.join(queue_path, uid))
+
+ if self._enqueue_checksum_id is None:
+ self._enqueue_checksum_id = \
+ gobject.idle_add(self._process_entry_cb,
+ priority=gobject.PRIORITY_LOW)
+
+ def remove(self, uid):
+ """Remove any structures left from space optimization
+
+ """
+ checksum = self._metadata_store.get_property(uid, 'checksum')
+ if checksum is None:
+ return
+
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('remove %r' % os.path.join(checksum_path, uid))
+ os.remove(os.path.join(checksum_path, uid))
+ try:
+ os.rmdir(checksum_path)
+ logging.debug('removed %r' % checksum_path)
+ except OSError, e:
+ if e.errno != errno.ENOTEMPTY:
+ raise
+
+ def _identical_file_already_exists(self, checksum):
+ """Check if we already have files with this checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(checksum_path)
+
+ def _get_file_from_checksum(self, checksum):
+ """Get a file that matches checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ first_file_link = os.listdir(checksum_path)[0]
+ first_file = os.readlink(os.path.join(checksum_path, first_file_link))
+ return first_file
+
+ def _create_checksum_dir(self, checksum):
+ """Create directory that tracks files with this same checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ logging.debug('create dir %r' % checksum_path)
+ os.mkdir(checksum_path)
+
+ def _add_checksum_entry(self, uid, checksum):
+ """Create a symbolic link in the checksum dir to the file in the entry
+ dir.
+
+ """
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('symlink %r -> %r' % (os.path.join(checksum_path, uid),
+ os.path.join(entry_path, 'data')))
+ os.symlink(os.path.join(entry_path, 'data'),
+ os.path.join(checksum_path, uid))
+
+ def _already_linked(self, uid, checksum):
+ """Check if this entry's file is already a hard link to the checksums
+ dir.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(os.path.join(checksum_path, uid))
+
+ def _process_entry_cb(self):
+ """Process one item in the checksums queue by calculating its checksum,
+ checking if there exist already an identical file, and in that case
+ substituting its file with a hard link to that pre-existing file.
+
+ """
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ queue = os.listdir(queue_path)
+ if queue:
+ uid = queue[0]
+ logging.debug('_process_entry_cb processing %r' % uid)
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_in_entry_path = os.path.join(entry_path, 'data')
+ checksum = self._calculate_md5sum(file_in_entry_path)
+ self._metadata_store.set_property(uid, 'checksum', checksum)
+
+ if self._identical_file_already_exists(checksum):
+ if not self._already_linked(uid, checksum):
+ logging.debug('delete %r' % file_in_entry_path)
+ os.remove(file_in_entry_path)
+
+ existing_file = self._get_file_from_checksum(checksum)
+ logging.debug('link %r -> %r' % \
+ (existing_file, file_in_entry_path))
+ os.link(existing_file, file_in_entry_path)
+
+ self._add_checksum_entry(uid, checksum)
+ else:
+ self._create_checksum_dir(checksum)
+ self._add_checksum_entry(uid, checksum)
+
+ os.remove(os.path.join(queue_path, uid))
+
+ if len(queue) <= 1:
+ self._enqueue_checksum_id = None
+ return False
+ else:
+ return True
+
+ def _calculate_md5sum(self, path):
+ """Calculate the md5 checksum of a given file.
+
+ """
+ in_, out = os.popen2(['md5sum', path])
+ return out.read().split(' ', 1)[0]
+
diff --git a/tests/test_sugar.py b/tests/test_sugar.py
index ab4a897..1dbb607 100644
--- a/tests/test_sugar.py
+++ b/tests/test_sugar.py
@@ -108,12 +108,14 @@ class FunctionalityTest(CommonTest):
props = self._data_store.get_properties(uid, byte_arrays=True)
del props['uid']
del props['mountpoint']
+ del props['checksum']
assert props == PROPS_WITHOUT_PREVIEW
t = self.update(uid)
props = self._data_store.get_properties(uid, byte_arrays=True)
del props['uid']
del props['mountpoint']
+ del props['checksum']
assert props == PROPS_WITH_PREVIEW
file_name = self._data_store.get_filename(uid)