diff options
author | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-08-26 10:39:44 (GMT) |
---|---|---|
committer | Tomeu Vizoso <tomeu@tomeuvizoso.net> | 2008-08-26 10:39:44 (GMT) |
commit | ee53b9e35feb4a0a4c00035c1e4d4ea973cf2a16 (patch) | |
tree | cc81293cafd7df0a8c8494f86dc1a8c9207d5578 | |
parent | a8736ad26ba5d55e1c96e6c5587350d65d6f3b1c (diff) |
Implement hard-linking to identical files and refactor path logic to LayoutManager
-rw-r--r-- | src/olpc/datastore/Makefile.am | 1 | ||||
-rw-r--r-- | src/olpc/datastore/datastore.py | 46 | ||||
-rw-r--r-- | src/olpc/datastore/filestore.py | 166 | ||||
-rw-r--r-- | src/olpc/datastore/indexstore.py | 10 | ||||
-rw-r--r-- | src/olpc/datastore/layoutmanager.py | 36 | ||||
-rw-r--r-- | src/olpc/datastore/metadatastore.py | 19 |
6 files changed, 215 insertions, 63 deletions
diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am index 5bfc229..708f251 100644 --- a/src/olpc/datastore/Makefile.am +++ b/src/olpc/datastore/Makefile.am @@ -4,5 +4,6 @@ datastore_PYTHON = \ datastore.py \ filestore.py \ indexstore.py \ + layoutmanager.py \ metadatastore.py \ __version__.py diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py index 23c1f21..7c041ef 100644 --- a/src/olpc/datastore/datastore.py +++ b/src/olpc/datastore/datastore.py @@ -12,12 +12,11 @@ __license__ = 'The GNU Public License V2+' import logging import uuid -import os.path import time -import dbus.service -import dbus.mainloop.glib +import dbus +from olpc.datastore import layoutmanager from olpc.datastore.metadatastore import MetadataStore from olpc.datastore.indexstore import IndexStore from olpc.datastore.filestore import FileStore @@ -40,20 +39,10 @@ class DataStore(dbus.service.Object): allow_replacement=False) dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) - profile = os.environ.get('SUGAR_PROFILE', 'default') - base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) - - self._root_path = os.path.join(base_dir, 'datastore2') - if not os.path.exists(self._root_path): - os.makedirs(self._root_path) - self._metadata_store = MetadataStore() - self._index_store = IndexStore(self._root_path) + self._index_store = IndexStore() self._file_store = FileStore() - def _get_entry_path(self, uid): - return os.path.join(self._root_path, uid[:2], uid) - def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \ (async_cb, async_err_cb, uid, exc)) @@ -73,14 +62,13 @@ class DataStore(dbus.service.Object): def create(self, props, file_path, transfer_ownership, async_cb, async_err_cb): uid = str(uuid.uuid4()) - dir_path = self._get_entry_path(uid) if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) - self._metadata_store.store(uid, props, dir_path) + self._metadata_store.store(uid, props) self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, dir_path, + self._file_store.store(uid, file_path, transfer_ownership, lambda *args: self._create_completion_cb(async_cb, async_err_cb, uid, @@ -108,14 +96,12 @@ class DataStore(dbus.service.Object): byte_arrays=True) def update(self, uid, props, file_path, transfer_ownership, async_cb, async_err_cb): - dir_path = self._get_entry_path(uid) - if not props.get('timestamp', ''): props['timestamp'] = int(time.time()) - self._metadata_store.store(uid, props, dir_path) + self._metadata_store.store(uid, props) self._index_store.store(uid, props) - self._file_store.store(uid, file_path, transfer_ownership, dir_path, + self._file_store.store(uid, file_path, transfer_ownership, lambda *args: self._update_completion_cb(async_cb, async_err_cb, uid, @@ -133,8 +119,7 @@ class DataStore(dbus.service.Object): uids, count = self._index_store.find(query) entries = [] for uid in uids: - dir_path = self._get_entry_path(uid) - metadata = self._metadata_store.retrieve(uid, dir_path, properties) + metadata = self._metadata_store.retrieve(uid, properties) entries.append(metadata) logger.debug('find(): %r' % (time.time() - t)) return entries, count @@ -145,15 +130,13 @@ class DataStore(dbus.service.Object): sender_keyword='sender') def get_filename(self, uid, sender=None): user_id = dbus.Bus().get_unix_user(sender) - dir_path = self._get_entry_path(uid) - return self._file_store.retrieve(uid, dir_path, user_id) + return self._file_store.retrieve(uid, user_id) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='s', out_signature='a{sv}') def get_properties(self, uid): - dir_path = self._get_entry_path(uid) - return self._metadata_store.retrieve(uid, dir_path) + return self._metadata_store.retrieve(uid) @dbus.service.method(DS_DBUS_INTERFACE, in_signature='sa{sv}', @@ -169,10 +152,13 @@ class DataStore(dbus.service.Object): in_signature='s', out_signature='') def delete(self, uid): - dir_path = self._get_entry_path(uid) self._index_store.delete(uid) - self._file_store.delete(uid, dir_path) - self._metadata_store.delete(uid, dir_path) + self._file_store.delete(uid) + self._metadata_store.delete(uid) + + entry_path = layoutmanager.get_instance().get_entry_path(uid) + os.removedirs(entry_path) + self.Deleted(uid) logger.debug("deleted %s" % uid) diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py index 18097f8..15b0185 100644 --- a/src/olpc/datastore/filestore.py +++ b/src/olpc/datastore/filestore.py @@ -1,36 +1,154 @@ import os -import time import errno import logging import gobject +from olpc.datastore import layoutmanager + class FileStore(object): - def store(self, uid, file_path, transfer_ownership, dir_path, completion_cb): + def __init__(self): + self._enqueue_checksum_id = None + + # TODO: add protection against store and retrieve operations on entries + # that are being processed async. + + def store(self, uid, file_path, transfer_ownership, completion_cb): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + destination_path = os.path.join(dir_path, uid) if os.path.exists(file_path): if transfer_ownership: try: os.rename(file_path, destination_path) + self._enqueue_checksum(uid) completion_cb() except OSError, e: - if e.errno == errno.EXDEV: - async_copy = AsyncCopy(file_path, destination_path, completion_cb) - async_copy.start() - else: - raise + if e.errno == errno.EXDEV: + self._async_copy(uid, file_path, destination_path, + completion_cb) + else: + raise else: - raise NotImplementedError() + self._async_copy(uid, file_path, destination_path, + completion_cb) elif file_path == '' and os.path.exists(destination_path): os.remove(destination_path) completion_cb() else: completion_cb() - def _copy_completion_cb(self, completion_cb, exc=None): - completion_cb(exc) + def _async_copy(self, uid, file_path, destination_path, completion_cb): + async_copy = AsyncCopy(file_path, destination_path, + lambda: self._async_copy_completion_cb(uid, completion_cb)) + async_copy.start() + + def _async_copy_completion_cb(self, uid, completion_cb): + self._enqueue_checksum(uid) + completion_cb() + + def _enqueue_checksum(self, uid): + queue_path = layoutmanager.get_instance().get_queue_path() + open(os.path.join(queue_path, uid), 'w').close() + logging.debug('_enqueue_checksum %r' % os.path.join(queue_path, uid)) + if self._enqueue_checksum_id is None: + self._enqueue_checksum_id = \ + gobject.idle_add(self._compute_checksum_cb) + + def _identical_file_already_exists(self, checksum): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(checksum_path) + + def _get_file_from_checksum(self, checksum): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + first_file_link = os.listdir(checksum_path)[0] + first_file = os.readlink(os.path.join(checksum_path, first_file_link)) + return first_file + + def _create_checksum_dir(self, checksum): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + logging.debug('create dir %r' % checksum_path) + os.mkdir(checksum_path) + + def _add_checksum_entry(self, uid, checksum): + entry_path = layoutmanager.get_instance().get_entry_path(uid) + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('symlink %r -> %r' % (os.path.join(entry_path, uid), + os.path.join(checksum_path, uid))) + os.symlink(os.path.join(entry_path, uid), + os.path.join(checksum_path, uid)) + + logging.debug('symlink %r -> %r' % \ + (checksum_path, os.path.join(entry_path, 'checksum'))) + os.symlink(checksum_path, os.path.join(entry_path, 'checksum')) + + def _remove_checksum_entry(self, uid): + entry_path = layoutmanager.get_instance().get_entry_path(uid) + checksum = os.readlink(os.path.join(entry_path, 'checksum')) + + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + os.remove(os.path.join(checksum_path, uid)) + try: + os.rmdir(checksum_path) + except OSError, e: + if e.errno != errno.ENOTEMPTY: + raise + + os.remove(os.path.join(entry_path, 'checksum')) + + def _already_linked(self, uid, checksum): + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(os.path.join(checksum_path, uid)) + + def _compute_checksum_cb(self): + queue_path = layoutmanager.get_instance().get_queue_path() + queue = os.listdir(queue_path) + if queue: + uid = queue[0] + logging.debug('_compute_checksum_cb processing %r' % uid) + entry_path = layoutmanager.get_instance().get_entry_path(uid) + file_in_entry_path = os.path.join(entry_path, uid) + checksum = self._calculate_md5sum(os.path.join(entry_path, uid)) + + if self._identical_file_already_exists(checksum): + if not self._already_linked(uid, checksum): + logging.debug('delete %r' % file_in_entry_path) + os.remove(file_in_entry_path) + + existing_file = self._get_file_from_checksum(checksum) + logging.debug('link %r -> %r' % \ + (existing_file, file_in_entry_path)) + os.link(existing_file, file_in_entry_path) + + self._add_checksum_entry(uid, checksum) + else: + self._create_checksum_dir(checksum) + self._add_checksum_entry(uid, checksum) + + os.remove(os.path.join(queue_path, uid)) + + if len(queue) <= 1: + self._enqueue_checksum_id = None + return False + else: + return True + + def _calculate_md5sum(self, path): + in_, out = os.popen2(['md5sum', path]) + return out.read().split(' ', 1)[0] - def retrieve(self, uid, dir_path, user_id): + def retrieve(self, uid, user_id): + dir_path = layoutmanager.get_instance().get_entry_path(uid) file_path = os.path.join(dir_path, uid) if not os.path.exists(file_path): return '' @@ -44,8 +162,8 @@ class FileStore(object): 'uid_to_instance_dir', str(user_id)) else: profile = os.environ.get('SUGAR_PROFILE', 'default') - destination_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile, - 'data') + destination_dir = os.path.join(os.path.expanduser('~'), '.sugar', + profile, 'data') if not os.path.exists(destination_dir): os.makedirs(destination_dir) @@ -71,13 +189,15 @@ class FileStore(object): return destination_path - def delete(self, uid, dir_path): + def delete(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) file_path = os.path.join(dir_path, uid) if os.path.exists(file_path): + self._remove_checksum_entry(uid) os.remove(file_path) class AsyncCopy: - CHUNK_SIZE=65536 + CHUNK_SIZE = 65536 def __init__(self, src, dest, completion): self.src = src @@ -100,10 +220,11 @@ class AsyncCopy: # error writing data to file? if count < len(data): - logging.error("AC: Error writing %s -> %s: wrote less than expected" % \ - (self.src, self.dest)) + logging.error('AC: Error writing %s -> %s: wrote less than ' + 'expected' % (self.src, self.dest)) self._cleanup() - self.completion(RuntimeError("Error writing data to destination file")) + self.completion(RuntimeError( + 'Error writing data to destination file')) return False # FIXME: emit progress here @@ -114,7 +235,8 @@ class AsyncCopy: self.completion(None) return False except Exception, err: - logging.error("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err)) + logging.error("AC: Error copying %s -> %s: %r" % \ + (self.src, self.dest, err)) self._cleanup() self.completion(err) return False @@ -123,11 +245,11 @@ class AsyncCopy: def start(self): self.src_fp = os.open(self.src, os.O_RDONLY) - self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, + 0644) stat = os.fstat(self.src_fp) self.size = stat[6] - self.tstart = time.time() - sid = gobject.idle_add(self._copy_block) + gobject.idle_add(self._copy_block) diff --git a/src/olpc/datastore/indexstore.py b/src/olpc/datastore/indexstore.py index 549b742..0153174 100644 --- a/src/olpc/datastore/indexstore.py +++ b/src/olpc/datastore/indexstore.py @@ -1,4 +1,3 @@ -import os import logging import time import sys @@ -6,6 +5,8 @@ import sys import xapian from xapian import WritableDatabase, Document, Enquire, Query, QueryParser +from olpc.datastore import layoutmanager + _MAX_LIMIT = 4096 _VALUE_UID = 0 @@ -22,8 +23,8 @@ _PREFIX_MIME_TYPE = 'M' _PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview'] class IndexStore(object): - def __init__(self, root_path): - index_path = os.path.join(root_path, 'index') + def __init__(self): + index_path = layoutmanager.get_instance().get_index_path() self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN) def _document_exists(self, uid): @@ -156,7 +157,8 @@ class IndexStore(object): start = time.mktime(time.strptime(start, DATE_FORMAT)) end = mtime_range['end'][:-7] - # FIXME: this will give an unexpected result if the journal is in a different timezone + # FIXME: this will give an unexpected result if the journal is in a + # different timezone end = time.mktime(time.strptime(end, DATE_FORMAT)) query['timestamp'] = {'start': int(start), 'end': int(end)} diff --git a/src/olpc/datastore/layoutmanager.py b/src/olpc/datastore/layoutmanager.py new file mode 100644 index 0000000..e9c435e --- /dev/null +++ b/src/olpc/datastore/layoutmanager.py @@ -0,0 +1,36 @@ +import os + +class LayoutManager(object): + def __init__(self): + profile = os.environ.get('SUGAR_PROFILE', 'default') + base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) + + self._root_path = os.path.join(base_dir, 'datastore2') + + self._create_if_needed(self._root_path) + self._create_if_needed(self.get_checksums_dir()) + self._create_if_needed(self.get_queue_path()) + + def _create_if_needed(self, path): + if not os.path.exists(path): + os.makedirs(path) + + def get_entry_path(self, uid): + return os.path.join(self._root_path, uid[:2], uid) + + def get_index_path(self): + return os.path.join(self._root_path, 'index') + + def get_checksums_dir(self): + return os.path.join(self._root_path, 'checksums') + + def get_queue_path(self): + return os.path.join(self.get_checksums_dir(), 'queue') + +_instance = None +def get_instance(): + global _instance + if _instance is None: + _instance = LayoutManager() + return _instance + diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py index d0e1e5a..f15f1af 100644 --- a/src/olpc/datastore/metadatastore.py +++ b/src/olpc/datastore/metadatastore.py @@ -8,12 +8,15 @@ except ImportError: import simplejson has_cjson = False +from olpc.datastore import layoutmanager + MAX_SIZE = 256 class MetadataStore(object): - def store(self, uid, metadata, dir_path): + def store(self, uid, metadata): metadata = metadata.copy() + dir_path = layoutmanager.get_instance().get_entry_path(uid) if not os.path.exists(dir_path): os.makedirs(dir_path) @@ -28,10 +31,11 @@ class MetadataStore(object): metadata['uid'] = uid self._encode(metadata, os.path.join(dir_path, 'metadata')) - def retrieve(self, uid, dir_path, properties=[]): + def retrieve(self, uid, properties=None): import time t = time.time() + dir_path = layoutmanager.get_instance().get_entry_path(uid) if not os.path.exists(dir_path): raise ValueError('Unknown object: %r' % uid) @@ -42,7 +46,7 @@ class MetadataStore(object): metadata = {} if properties: - for key, value in metadata.items(): + for key, value_ in metadata.items(): if key not in properties: del metadata[key] @@ -53,14 +57,15 @@ class MetadataStore(object): continue file_path = os.path.join(extra_metadata_dir, key) if not os.path.isdir(file_path): - # TODO: This class shouldn't know anything about dbus, probably. + # TODO: This class shouldn't know anything about dbus. import dbus metadata[key] = dbus.ByteArray(open(file_path).read()) logging.debug('retrieve metadata: %r' % (time.time() - t)) return metadata - def delete(self, uid, dir_path): + def delete(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) metadata_path = os.path.join(dir_path, 'metadata') if os.path.isfile(metadata_path): os.remove(os.path.join(dir_path, 'metadata')) @@ -71,7 +76,7 @@ class MetadataStore(object): if os.path.isdir(extra_metadata_path): for key in os.listdir(extra_metadata_path): os.remove(os.path.join(extra_metadata_path, key)) - os.removedirs(os.path.join(dir_path, 'extra_metadata')) + os.rmdir(os.path.join(dir_path, 'extra_metadata')) else: logging.warning('%s is not a valid path' % extra_metadata_path) @@ -85,7 +90,7 @@ class MetadataStore(object): def _is_unicode(self, string): try: - temp = string.decode('utf-8') + string.decode('utf-8') return True except UnicodeDecodeError: return False |