Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomeu Vizoso <tomeu@tomeuvizoso.net>2008-08-26 10:39:44 (GMT)
committer Tomeu Vizoso <tomeu@tomeuvizoso.net>2008-08-26 10:39:44 (GMT)
commitee53b9e35feb4a0a4c00035c1e4d4ea973cf2a16 (patch)
treecc81293cafd7df0a8c8494f86dc1a8c9207d5578
parenta8736ad26ba5d55e1c96e6c5587350d65d6f3b1c (diff)
Implement hard-linking to identical files and refactor path logic to LayoutManager
-rw-r--r--src/olpc/datastore/Makefile.am1
-rw-r--r--src/olpc/datastore/datastore.py46
-rw-r--r--src/olpc/datastore/filestore.py166
-rw-r--r--src/olpc/datastore/indexstore.py10
-rw-r--r--src/olpc/datastore/layoutmanager.py36
-rw-r--r--src/olpc/datastore/metadatastore.py19
6 files changed, 215 insertions, 63 deletions
diff --git a/src/olpc/datastore/Makefile.am b/src/olpc/datastore/Makefile.am
index 5bfc229..708f251 100644
--- a/src/olpc/datastore/Makefile.am
+++ b/src/olpc/datastore/Makefile.am
@@ -4,5 +4,6 @@ datastore_PYTHON = \
datastore.py \
filestore.py \
indexstore.py \
+ layoutmanager.py \
metadatastore.py \
__version__.py
diff --git a/src/olpc/datastore/datastore.py b/src/olpc/datastore/datastore.py
index 23c1f21..7c041ef 100644
--- a/src/olpc/datastore/datastore.py
+++ b/src/olpc/datastore/datastore.py
@@ -12,12 +12,11 @@ __license__ = 'The GNU Public License V2+'
import logging
import uuid
-import os.path
import time
-import dbus.service
-import dbus.mainloop.glib
+import dbus
+from olpc.datastore import layoutmanager
from olpc.datastore.metadatastore import MetadataStore
from olpc.datastore.indexstore import IndexStore
from olpc.datastore.filestore import FileStore
@@ -40,20 +39,10 @@ class DataStore(dbus.service.Object):
allow_replacement=False)
dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH)
- profile = os.environ.get('SUGAR_PROFILE', 'default')
- base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
-
- self._root_path = os.path.join(base_dir, 'datastore2')
- if not os.path.exists(self._root_path):
- os.makedirs(self._root_path)
-
self._metadata_store = MetadataStore()
- self._index_store = IndexStore(self._root_path)
+ self._index_store = IndexStore()
self._file_store = FileStore()
- def _get_entry_path(self, uid):
- return os.path.join(self._root_path, uid[:2], uid)
-
def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
(async_cb, async_err_cb, uid, exc))
@@ -73,14 +62,13 @@ class DataStore(dbus.service.Object):
def create(self, props, file_path, transfer_ownership,
async_cb, async_err_cb):
uid = str(uuid.uuid4())
- dir_path = self._get_entry_path(uid)
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props, dir_path)
+ self._metadata_store.store(uid, props)
self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership, dir_path,
+ self._file_store.store(uid, file_path, transfer_ownership,
lambda *args: self._create_completion_cb(async_cb,
async_err_cb,
uid,
@@ -108,14 +96,12 @@ class DataStore(dbus.service.Object):
byte_arrays=True)
def update(self, uid, props, file_path, transfer_ownership,
async_cb, async_err_cb):
- dir_path = self._get_entry_path(uid)
-
if not props.get('timestamp', ''):
props['timestamp'] = int(time.time())
- self._metadata_store.store(uid, props, dir_path)
+ self._metadata_store.store(uid, props)
self._index_store.store(uid, props)
- self._file_store.store(uid, file_path, transfer_ownership, dir_path,
+ self._file_store.store(uid, file_path, transfer_ownership,
lambda *args: self._update_completion_cb(async_cb,
async_err_cb,
uid,
@@ -133,8 +119,7 @@ class DataStore(dbus.service.Object):
uids, count = self._index_store.find(query)
entries = []
for uid in uids:
- dir_path = self._get_entry_path(uid)
- metadata = self._metadata_store.retrieve(uid, dir_path, properties)
+ metadata = self._metadata_store.retrieve(uid, properties)
entries.append(metadata)
logger.debug('find(): %r' % (time.time() - t))
return entries, count
@@ -145,15 +130,13 @@ class DataStore(dbus.service.Object):
sender_keyword='sender')
def get_filename(self, uid, sender=None):
user_id = dbus.Bus().get_unix_user(sender)
- dir_path = self._get_entry_path(uid)
- return self._file_store.retrieve(uid, dir_path, user_id)
+ return self._file_store.retrieve(uid, user_id)
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='s',
out_signature='a{sv}')
def get_properties(self, uid):
- dir_path = self._get_entry_path(uid)
- return self._metadata_store.retrieve(uid, dir_path)
+ return self._metadata_store.retrieve(uid)
@dbus.service.method(DS_DBUS_INTERFACE,
in_signature='sa{sv}',
@@ -169,10 +152,13 @@ class DataStore(dbus.service.Object):
in_signature='s',
out_signature='')
def delete(self, uid):
- dir_path = self._get_entry_path(uid)
self._index_store.delete(uid)
- self._file_store.delete(uid, dir_path)
- self._metadata_store.delete(uid, dir_path)
+ self._file_store.delete(uid)
+ self._metadata_store.delete(uid)
+
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.removedirs(entry_path)
+
self.Deleted(uid)
logger.debug("deleted %s" % uid)
diff --git a/src/olpc/datastore/filestore.py b/src/olpc/datastore/filestore.py
index 18097f8..15b0185 100644
--- a/src/olpc/datastore/filestore.py
+++ b/src/olpc/datastore/filestore.py
@@ -1,36 +1,154 @@
import os
-import time
import errno
import logging
import gobject
+from olpc.datastore import layoutmanager
+
class FileStore(object):
- def store(self, uid, file_path, transfer_ownership, dir_path, completion_cb):
+ def __init__(self):
+ self._enqueue_checksum_id = None
+
+ # TODO: add protection against store and retrieve operations on entries
+ # that are being processed async.
+
+ def store(self, uid, file_path, transfer_ownership, completion_cb):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(dir_path):
+ os.makedirs(dir_path)
+
destination_path = os.path.join(dir_path, uid)
if os.path.exists(file_path):
if transfer_ownership:
try:
os.rename(file_path, destination_path)
+ self._enqueue_checksum(uid)
completion_cb()
except OSError, e:
- if e.errno == errno.EXDEV:
- async_copy = AsyncCopy(file_path, destination_path, completion_cb)
- async_copy.start()
- else:
- raise
+ if e.errno == errno.EXDEV:
+ self._async_copy(uid, file_path, destination_path,
+ completion_cb)
+ else:
+ raise
else:
- raise NotImplementedError()
+ self._async_copy(uid, file_path, destination_path,
+ completion_cb)
elif file_path == '' and os.path.exists(destination_path):
os.remove(destination_path)
completion_cb()
else:
completion_cb()
- def _copy_completion_cb(self, completion_cb, exc=None):
- completion_cb(exc)
+ def _async_copy(self, uid, file_path, destination_path, completion_cb):
+ async_copy = AsyncCopy(file_path, destination_path,
+ lambda: self._async_copy_completion_cb(uid, completion_cb))
+ async_copy.start()
+
+ def _async_copy_completion_cb(self, uid, completion_cb):
+ self._enqueue_checksum(uid)
+ completion_cb()
+
+ def _enqueue_checksum(self, uid):
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ open(os.path.join(queue_path, uid), 'w').close()
+ logging.debug('_enqueue_checksum %r' % os.path.join(queue_path, uid))
+ if self._enqueue_checksum_id is None:
+ self._enqueue_checksum_id = \
+ gobject.idle_add(self._compute_checksum_cb)
+
+ def _identical_file_already_exists(self, checksum):
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(checksum_path)
+
+ def _get_file_from_checksum(self, checksum):
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ first_file_link = os.listdir(checksum_path)[0]
+ first_file = os.readlink(os.path.join(checksum_path, first_file_link))
+ return first_file
+
+ def _create_checksum_dir(self, checksum):
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ logging.debug('create dir %r' % checksum_path)
+ os.mkdir(checksum_path)
+
+ def _add_checksum_entry(self, uid, checksum):
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('symlink %r -> %r' % (os.path.join(entry_path, uid),
+ os.path.join(checksum_path, uid)))
+ os.symlink(os.path.join(entry_path, uid),
+ os.path.join(checksum_path, uid))
+
+ logging.debug('symlink %r -> %r' % \
+ (checksum_path, os.path.join(entry_path, 'checksum')))
+ os.symlink(checksum_path, os.path.join(entry_path, 'checksum'))
+
+ def _remove_checksum_entry(self, uid):
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ checksum = os.readlink(os.path.join(entry_path, 'checksum'))
+
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ os.remove(os.path.join(checksum_path, uid))
+ try:
+ os.rmdir(checksum_path)
+ except OSError, e:
+ if e.errno != errno.ENOTEMPTY:
+ raise
+
+ os.remove(os.path.join(entry_path, 'checksum'))
+
+ def _already_linked(self, uid, checksum):
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(os.path.join(checksum_path, uid))
+
+ def _compute_checksum_cb(self):
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ queue = os.listdir(queue_path)
+ if queue:
+ uid = queue[0]
+ logging.debug('_compute_checksum_cb processing %r' % uid)
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_in_entry_path = os.path.join(entry_path, uid)
+ checksum = self._calculate_md5sum(os.path.join(entry_path, uid))
+
+ if self._identical_file_already_exists(checksum):
+ if not self._already_linked(uid, checksum):
+ logging.debug('delete %r' % file_in_entry_path)
+ os.remove(file_in_entry_path)
+
+ existing_file = self._get_file_from_checksum(checksum)
+ logging.debug('link %r -> %r' % \
+ (existing_file, file_in_entry_path))
+ os.link(existing_file, file_in_entry_path)
+
+ self._add_checksum_entry(uid, checksum)
+ else:
+ self._create_checksum_dir(checksum)
+ self._add_checksum_entry(uid, checksum)
+
+ os.remove(os.path.join(queue_path, uid))
+
+ if len(queue) <= 1:
+ self._enqueue_checksum_id = None
+ return False
+ else:
+ return True
+
+ def _calculate_md5sum(self, path):
+ in_, out = os.popen2(['md5sum', path])
+ return out.read().split(' ', 1)[0]
- def retrieve(self, uid, dir_path, user_id):
+ def retrieve(self, uid, user_id):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
file_path = os.path.join(dir_path, uid)
if not os.path.exists(file_path):
return ''
@@ -44,8 +162,8 @@ class FileStore(object):
'uid_to_instance_dir', str(user_id))
else:
profile = os.environ.get('SUGAR_PROFILE', 'default')
- destination_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile,
- 'data')
+ destination_dir = os.path.join(os.path.expanduser('~'), '.sugar',
+ profile, 'data')
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
@@ -71,13 +189,15 @@ class FileStore(object):
return destination_path
- def delete(self, uid, dir_path):
+ def delete(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
file_path = os.path.join(dir_path, uid)
if os.path.exists(file_path):
+ self._remove_checksum_entry(uid)
os.remove(file_path)
class AsyncCopy:
- CHUNK_SIZE=65536
+ CHUNK_SIZE = 65536
def __init__(self, src, dest, completion):
self.src = src
@@ -100,10 +220,11 @@ class AsyncCopy:
# error writing data to file?
if count < len(data):
- logging.error("AC: Error writing %s -> %s: wrote less than expected" % \
- (self.src, self.dest))
+ logging.error('AC: Error writing %s -> %s: wrote less than '
+ 'expected' % (self.src, self.dest))
self._cleanup()
- self.completion(RuntimeError("Error writing data to destination file"))
+ self.completion(RuntimeError(
+ 'Error writing data to destination file'))
return False
# FIXME: emit progress here
@@ -114,7 +235,8 @@ class AsyncCopy:
self.completion(None)
return False
except Exception, err:
- logging.error("AC: Error copying %s -> %s: %r" % (self.src, self.dest, err))
+ logging.error("AC: Error copying %s -> %s: %r" % \
+ (self.src, self.dest, err))
self._cleanup()
self.completion(err)
return False
@@ -123,11 +245,11 @@ class AsyncCopy:
def start(self):
self.src_fp = os.open(self.src, os.O_RDONLY)
- self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, 0644)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT,
+ 0644)
stat = os.fstat(self.src_fp)
self.size = stat[6]
- self.tstart = time.time()
- sid = gobject.idle_add(self._copy_block)
+ gobject.idle_add(self._copy_block)
diff --git a/src/olpc/datastore/indexstore.py b/src/olpc/datastore/indexstore.py
index 549b742..0153174 100644
--- a/src/olpc/datastore/indexstore.py
+++ b/src/olpc/datastore/indexstore.py
@@ -1,4 +1,3 @@
-import os
import logging
import time
import sys
@@ -6,6 +5,8 @@ import sys
import xapian
from xapian import WritableDatabase, Document, Enquire, Query, QueryParser
+from olpc.datastore import layoutmanager
+
_MAX_LIMIT = 4096
_VALUE_UID = 0
@@ -22,8 +23,8 @@ _PREFIX_MIME_TYPE = 'M'
_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview']
class IndexStore(object):
- def __init__(self, root_path):
- index_path = os.path.join(root_path, 'index')
+ def __init__(self):
+ index_path = layoutmanager.get_instance().get_index_path()
self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN)
def _document_exists(self, uid):
@@ -156,7 +157,8 @@ class IndexStore(object):
start = time.mktime(time.strptime(start, DATE_FORMAT))
end = mtime_range['end'][:-7]
- # FIXME: this will give an unexpected result if the journal is in a different timezone
+ # FIXME: this will give an unexpected result if the journal is in a
+ # different timezone
end = time.mktime(time.strptime(end, DATE_FORMAT))
query['timestamp'] = {'start': int(start), 'end': int(end)}
diff --git a/src/olpc/datastore/layoutmanager.py b/src/olpc/datastore/layoutmanager.py
new file mode 100644
index 0000000..e9c435e
--- /dev/null
+++ b/src/olpc/datastore/layoutmanager.py
@@ -0,0 +1,36 @@
+import os
+
+class LayoutManager(object):
+ def __init__(self):
+ profile = os.environ.get('SUGAR_PROFILE', 'default')
+ base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
+
+ self._root_path = os.path.join(base_dir, 'datastore2')
+
+ self._create_if_needed(self._root_path)
+ self._create_if_needed(self.get_checksums_dir())
+ self._create_if_needed(self.get_queue_path())
+
+ def _create_if_needed(self, path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+ def get_entry_path(self, uid):
+ return os.path.join(self._root_path, uid[:2], uid)
+
+ def get_index_path(self):
+ return os.path.join(self._root_path, 'index')
+
+ def get_checksums_dir(self):
+ return os.path.join(self._root_path, 'checksums')
+
+ def get_queue_path(self):
+ return os.path.join(self.get_checksums_dir(), 'queue')
+
+_instance = None
+def get_instance():
+ global _instance
+ if _instance is None:
+ _instance = LayoutManager()
+ return _instance
+
diff --git a/src/olpc/datastore/metadatastore.py b/src/olpc/datastore/metadatastore.py
index d0e1e5a..f15f1af 100644
--- a/src/olpc/datastore/metadatastore.py
+++ b/src/olpc/datastore/metadatastore.py
@@ -8,12 +8,15 @@ except ImportError:
import simplejson
has_cjson = False
+from olpc.datastore import layoutmanager
+
MAX_SIZE = 256
class MetadataStore(object):
- def store(self, uid, metadata, dir_path):
+ def store(self, uid, metadata):
metadata = metadata.copy()
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
@@ -28,10 +31,11 @@ class MetadataStore(object):
metadata['uid'] = uid
self._encode(metadata, os.path.join(dir_path, 'metadata'))
- def retrieve(self, uid, dir_path, properties=[]):
+ def retrieve(self, uid, properties=None):
import time
t = time.time()
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
if not os.path.exists(dir_path):
raise ValueError('Unknown object: %r' % uid)
@@ -42,7 +46,7 @@ class MetadataStore(object):
metadata = {}
if properties:
- for key, value in metadata.items():
+ for key, value_ in metadata.items():
if key not in properties:
del metadata[key]
@@ -53,14 +57,15 @@ class MetadataStore(object):
continue
file_path = os.path.join(extra_metadata_dir, key)
if not os.path.isdir(file_path):
- # TODO: This class shouldn't know anything about dbus, probably.
+ # TODO: This class shouldn't know anything about dbus.
import dbus
metadata[key] = dbus.ByteArray(open(file_path).read())
logging.debug('retrieve metadata: %r' % (time.time() - t))
return metadata
- def delete(self, uid, dir_path):
+ def delete(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
metadata_path = os.path.join(dir_path, 'metadata')
if os.path.isfile(metadata_path):
os.remove(os.path.join(dir_path, 'metadata'))
@@ -71,7 +76,7 @@ class MetadataStore(object):
if os.path.isdir(extra_metadata_path):
for key in os.listdir(extra_metadata_path):
os.remove(os.path.join(extra_metadata_path, key))
- os.removedirs(os.path.join(dir_path, 'extra_metadata'))
+ os.rmdir(os.path.join(dir_path, 'extra_metadata'))
else:
logging.warning('%s is not a valid path' % extra_metadata_path)
@@ -85,7 +90,7 @@ class MetadataStore(object):
def _is_unicode(self, string):
try:
- temp = string.decode('utf-8')
+ string.decode('utf-8')
return True
except UnicodeDecodeError:
return False