Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/src/carquinyol
diff options
context:
space:
mode:
Diffstat (limited to 'src/carquinyol')
-rw-r--r--src/carquinyol/Makefile.am23
-rw-r--r--src/carquinyol/__init__.py1
-rw-r--r--src/carquinyol/datastore.py315
-rw-r--r--src/carquinyol/filestore.py230
-rw-r--r--src/carquinyol/indexstore.py238
-rw-r--r--src/carquinyol/layoutmanager.py102
-rw-r--r--src/carquinyol/metadatareader.c241
-rw-r--r--src/carquinyol/metadatastore.py64
-rw-r--r--src/carquinyol/migration.py102
-rw-r--r--src/carquinyol/optimizer.py166
10 files changed, 1482 insertions, 0 deletions
diff --git a/src/carquinyol/Makefile.am b/src/carquinyol/Makefile.am
new file mode 100644
index 0000000..7c56174
--- /dev/null
+++ b/src/carquinyol/Makefile.am
@@ -0,0 +1,23 @@
+datastoredir = $(pythondir)/carquinyol
+datastore_PYTHON = \
+ __init__.py \
+ datastore.py \
+ filestore.py \
+ indexstore.py \
+ layoutmanager.py \
+ metadatastore.py \
+ migration.py \
+ optimizer.py
+
+AM_CPPFLAGS = \
+ $(WARN_CFLAGS) \
+ $(EXT_CFLAGS) \
+ $(PYTHON_INCLUDES)
+
+AM_LDFLAGS = -module -avoid-version
+
+pkgpyexecdir = $(pythondir)/carquinyol
+pkgpyexec_LTLIBRARIES = metadatareader.la
+
+metadatareader_la_SOURCES = \
+ metadatareader.c
diff --git a/src/carquinyol/__init__.py b/src/carquinyol/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/carquinyol/__init__.py
@@ -0,0 +1 @@
+
diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py
new file mode 100644
index 0000000..a118e03
--- /dev/null
+++ b/src/carquinyol/datastore.py
@@ -0,0 +1,315 @@
+# Copyright (C) 2008, One Laptop Per Child
+# Based on code Copyright (C) 2007, ObjectRealms, LLC
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import logging
+import uuid
+import time
+import os
+import traceback
+
+import dbus
+import gobject
+
+from sugar import mime
+
+from carquinyol import layoutmanager
+from carquinyol import migration
+from carquinyol.layoutmanager import MAX_QUERY_LIMIT
+from carquinyol.metadatastore import MetadataStore
+from carquinyol.indexstore import IndexStore
+from carquinyol.filestore import FileStore
+from carquinyol.optimizer import Optimizer
+
+# the name used by the logger
+DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore'
+
+DS_SERVICE = "org.laptop.sugar.DataStore"
+DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore"
+DS_OBJECT_PATH = "/org/laptop/sugar/DataStore"
+
+logger = logging.getLogger(DS_LOG_CHANNEL)
+
+class DataStore(dbus.service.Object):
+ """D-Bus API and logic for connecting all the other components.
+ """
+ def __init__(self, **options):
+ bus_name = dbus.service.BusName(DS_SERVICE,
+ bus=dbus.SessionBus(),
+ replace_existing=False,
+ allow_replacement=False)
+ dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH)
+
+ layout_manager = layoutmanager.get_instance()
+ if layout_manager.get_version() == 0:
+ migration.migrate_from_0()
+ layout_manager.set_version(1)
+ layout_manager.index_updated = False
+
+ self._metadata_store = MetadataStore()
+
+ self._index_store = IndexStore()
+ try:
+ self._index_store.open_index()
+ except Exception:
+ logging.error('Failed to open index, will rebuild\n%s' \
+ % traceback.format_exc())
+ layout_manager.index_updated = False
+ self._index_store.remove_index()
+ self._index_store.open_index()
+
+ self._file_store = FileStore()
+
+ if not layout_manager.index_updated:
+ logging.debug('Index is not up-to-date, will update')
+ self._rebuild_index()
+
+ self._optimizer = Optimizer(self._file_store, self._metadata_store)
+
+ def _rebuild_index(self):
+ uids = layoutmanager.get_instance().find_all()
+ logging.debug('Going to update the index with uids %r' % uids)
+ gobject.idle_add(lambda: self.__rebuild_index_cb(uids),
+ priority=gobject.PRIORITY_LOW)
+
+ def __rebuild_index_cb(self, uids):
+ if uids:
+ uid = uids.pop()
+
+ logging.debug('Updating entry %r in index. %d to go.' % \
+ (uid, len(uids)))
+
+ if not self._index_store.contains(uid):
+ try:
+ props = self._metadata_store.retrieve(uid)
+ self._index_store.store(uid, props)
+ except Exception:
+ logging.error('Error processing %r\n%s.' \
+ % (uid, traceback.format_exc()))
+
+ if not uids:
+ logging.debug('Finished updating index.')
+ layoutmanager.get_instance().index_updated = True
+ return False
+ else:
+ return True
+
+ def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \
+ (async_cb, async_err_cb, uid, exc))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Created(uid)
+ self._optimizer.optimize(uid)
+ logger.debug("created %s" % uid)
+ async_cb(uid)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='a{sv}sb',
+ out_signature='s',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def create(self, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ uid = str(uuid.uuid4())
+ logging.debug('datastore.create %r' % uid)
+
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._create_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Created(self, uid):
+ pass
+
+ def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None):
+ logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \
+ (async_cb, async_err_cb, exc))
+ if exc is not None:
+ async_err_cb(exc)
+ return
+
+ self.Updated(uid)
+ self._optimizer.optimize(uid)
+ logger.debug("updated %s" % uid)
+ async_cb()
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}sb',
+ out_signature='',
+ async_callbacks=('async_cb', 'async_err_cb'),
+ byte_arrays=True)
+ def update(self, uid, props, file_path, transfer_ownership,
+ async_cb, async_err_cb):
+ logging.debug('datastore.update %r' % uid)
+
+ if not props.get('timestamp', ''):
+ props['timestamp'] = int(time.time())
+
+ self._metadata_store.store(uid, props)
+ self._index_store.store(uid, props)
+
+ if os.path.exists(self._file_store.get_file_path(uid)) and \
+ (not file_path or os.path.exists(file_path)):
+ self._optimizer.remove(uid)
+ self._file_store.store(uid, file_path, transfer_ownership,
+ lambda *args: self._update_completion_cb(async_cb,
+ async_err_cb,
+ uid,
+ *args))
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Updated(self, uid):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='a{sv}as',
+ out_signature='aa{sv}u')
+ def find(self, query, properties):
+ logging.debug('datastore.find %r' % query)
+ t = time.time()
+
+ if layoutmanager.get_instance().index_updated:
+ try:
+ uids, count = self._index_store.find(query)
+ except Exception:
+ logging.error('Failed to query index, will rebuild\n%s' \
+ % traceback.format_exc())
+ layoutmanager.get_instance().index_updated = False
+ self._index_store.close_index()
+ self._index_store.remove_index()
+ self._index_store.open_index()
+ self._rebuild_index()
+
+ if not layoutmanager.get_instance().index_updated:
+ logging.warning('Index updating, returning all entries')
+
+ uids = layoutmanager.get_instance().find_all()
+ count = len(uids)
+
+ offset = query.get('offset', 0)
+ limit = query.get('limit', MAX_QUERY_LIMIT)
+ uids = uids[offset:offset + limit]
+
+ entries = []
+ for uid in uids:
+ metadata = self._metadata_store.retrieve(uid, properties)
+ entries.append(metadata)
+ logger.debug('find(): %r' % (time.time() - t))
+ return entries, count
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='s',
+ sender_keyword='sender')
+ def get_filename(self, uid, sender=None):
+ logging.debug('datastore.get_filename %r' % uid)
+ user_id = dbus.Bus().get_unix_user(sender)
+ extension = self._get_extension(uid)
+ return self._file_store.retrieve(uid, user_id, extension)
+
+ def _get_extension(self, uid):
+ mime_type = self._metadata_store.get_property(uid, 'mime_type')
+ if mime_type is None or not mime_type:
+ return ''
+ return mime.get_primary_extension(mime_type)
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='a{sv}')
+ def get_properties(self, uid):
+ logging.debug('datastore.get_properties %r' % uid)
+ metadata = self._metadata_store.retrieve(uid)
+ return metadata
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='sa{sv}',
+ out_signature='as')
+ def get_uniquevaluesfor(self, propertyname, query=None):
+ if propertyname != 'activity':
+ raise ValueError('Only ''activity'' is a supported property name')
+ if query:
+ raise ValueError('The query parameter is not supported')
+ if layoutmanager.get_instance().index_updated:
+ return self._index_store.get_activities()
+ else:
+ logging.warning('Index updating, returning an empty list')
+ return []
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature='s',
+ out_signature='')
+ def delete(self, uid):
+ self._optimizer.remove(uid)
+
+ self._index_store.delete(uid)
+ self._file_store.delete(uid)
+ self._metadata_store.delete(uid)
+
+ entry_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.removedirs(entry_path)
+
+ self.Deleted(uid)
+ logger.debug("deleted %s" % uid)
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="s")
+ def Deleted(self, uid):
+ pass
+
+ def stop(self):
+ """shutdown the service"""
+ self._index_store.close_index()
+ self.Stopped()
+
+ @dbus.service.signal(DS_DBUS_INTERFACE)
+ def Stopped(self):
+ pass
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="sa{sv}",
+ out_signature='s')
+ def mount(self, uri, options=None):
+ return ''
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="",
+ out_signature="aa{sv}")
+ def mounts(self):
+ return [{'id': 1}]
+
+ @dbus.service.method(DS_DBUS_INTERFACE,
+ in_signature="s",
+ out_signature="")
+ def unmount(self, mountpoint_id):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Mounted(self, descriptior):
+ pass
+
+ @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}")
+ def Unmounted(self, descriptor):
+ pass
+
diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py
new file mode 100644
index 0000000..f88c531
--- /dev/null
+++ b/src/carquinyol/filestore.py
@@ -0,0 +1,230 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import errno
+import logging
+import tempfile
+
+import gobject
+
+from carquinyol import layoutmanager
+
+class FileStore(object):
+ """Handle the storage of one file per entry.
+ """
+ # TODO: add protection against store and retrieve operations on entries
+ # that are being processed async.
+
+ def store(self, uid, file_path, transfer_ownership, completion_cb):
+ """Store a file for a given entry.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(dir_path):
+ os.makedirs(dir_path)
+
+ destination_path = os.path.join(dir_path, 'data')
+ if file_path:
+ if not os.path.isfile(file_path):
+ raise ValueError('No file at %r' % file_path)
+ if transfer_ownership:
+ try:
+ logging.debug('FileStore moving from %r to %r' % \
+ (file_path, destination_path))
+ os.rename(file_path, destination_path)
+ completion_cb()
+ except OSError, e:
+ if e.errno == errno.EXDEV:
+ self._async_copy(file_path, destination_path,
+ completion_cb)
+ else:
+ raise
+ else:
+ self._async_copy(file_path, destination_path, completion_cb)
+ """
+ TODO: How can we support deleting the file of an entry?
+ elif not file_path and os.path.exists(destination_path):
+ logging.debug('FileStore: deleting %r' % destination_path)
+ os.remove(destination_path)
+ completion_cb()
+ """
+ else:
+ logging.debug('FileStore: Nothing to do')
+ completion_cb()
+
+ def _async_copy(self, file_path, destination_path, completion_cb):
+ """Start copying a file asynchronously.
+
+ """
+ logging.debug('FileStore copying from %r to %r' % \
+ (file_path, destination_path))
+ async_copy = AsyncCopy(file_path, destination_path, completion_cb)
+ async_copy.start()
+
+ def retrieve(self, uid, user_id, extension):
+ """Place the file associated to a given entry into a directory where the
+ user can read it. The caller is reponsible for deleting this file.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_path = os.path.join(dir_path, 'data')
+ if not os.path.exists(file_path):
+ logging.debug('Entry %r doesnt have any file' % uid)
+ return ''
+
+ use_instance_dir = os.path.exists('/etc/olpc-security') and \
+ os.getuid() != user_id
+ if use_instance_dir:
+ if not user_id:
+ raise ValueError('Couldnt determine the current user uid.')
+ destination_dir = os.path.join(os.environ['HOME'], 'isolation', '1',
+ 'uid_to_instance_dir', str(user_id))
+ else:
+ profile = os.environ.get('SUGAR_PROFILE', 'default')
+ destination_dir = os.path.join(os.path.expanduser('~'), '.sugar',
+ profile, 'data')
+ if not os.path.exists(destination_dir):
+ os.makedirs(destination_dir)
+
+ if extension is None:
+ extension = ''
+ elif extension:
+ extension = '.' + extension
+
+ destination_path = os.path.join(destination_dir, uid + extension)
+
+ attempt = 1
+ while os.path.exists(destination_path):
+ if attempt > 10:
+ fd_, destination_path = tempfile.mkstemp(prefix=uid,
+ suffix=extension,
+ dir=destination_dir)
+ del fd_
+ os.unlink(destination_path)
+ break
+ else:
+ file_name = '%s_%s%s' % (uid, attempt, extension)
+ destination_path = os.path.join(destination_dir, file_name)
+ attempt += 1
+
+ # Try to hard link from the original file to the targetpath. This can
+ # fail if the file is in a different filesystem. Do a symlink instead.
+ try:
+ os.link(file_path, destination_path)
+ except OSError, e:
+ if e.errno == errno.EXDEV:
+ os.symlink(file_path, destination_path)
+ else:
+ raise
+
+ # Try to make the original file readable. This can fail if the file is
+ # in a FAT filesystem.
+ try:
+ os.chmod(file_path, 0604)
+ except OSError, e:
+ if e.errno != errno.EPERM:
+ raise
+
+ return destination_path
+
+ def get_file_path(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ return os.path.join(dir_path, 'data')
+
+ def delete(self, uid):
+ """Remove the file associated to a given entry.
+
+ """
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ file_path = os.path.join(dir_path, 'data')
+ if os.path.exists(file_path):
+ os.remove(file_path)
+
+ def hard_link_entry(self, new_uid, existing_uid):
+ existing_file = os.path.join(
+ layoutmanager.get_instance().get_entry_path(existing_uid),
+ 'data')
+ new_file = os.path.join(
+ layoutmanager.get_instance().get_entry_path(new_uid),
+ 'data')
+
+ logging.debug('removing %r' % new_file)
+ os.remove(new_file)
+
+ logging.debug('hard linking %r -> %r' % (new_file, existing_file))
+ os.link(existing_file, new_file)
+
+class AsyncCopy(object):
+ """Copy a file in chunks in the idle loop.
+
+ """
+ CHUNK_SIZE = 65536
+
+ def __init__(self, src, dest, completion):
+ self.src = src
+ self.dest = dest
+ self.completion = completion
+ self.src_fp = -1
+ self.dest_fp = -1
+ self.written = 0
+ self.size = 0
+
+ def _cleanup(self):
+ os.close(self.src_fp)
+ os.close(self.dest_fp)
+
+ def _copy_block(self, user_data=None):
+ try:
+ data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE)
+ count = os.write(self.dest_fp, data)
+ self.written += len(data)
+
+ # error writing data to file?
+ if count < len(data):
+ logging.error('AC: Error writing %s -> %s: wrote less than '
+ 'expected' % (self.src, self.dest))
+ self._cleanup()
+ self.completion(RuntimeError(
+ 'Error writing data to destination file'))
+ return False
+
+ # FIXME: emit progress here
+
+ # done?
+ if len(data) < AsyncCopy.CHUNK_SIZE:
+ self._cleanup()
+ self.completion(None)
+ return False
+ except Exception, err:
+ logging.error("AC: Error copying %s -> %s: %r" % \
+ (self.src, self.dest, err))
+ self._cleanup()
+ self.completion(err)
+ return False
+
+ return True
+
+ def start(self):
+ self.src_fp = os.open(self.src, os.O_RDONLY)
+ self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT,
+ 0644)
+
+ stat = os.fstat(self.src_fp)
+ self.size = stat[6]
+
+ gobject.idle_add(self._copy_block)
+
diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py
new file mode 100644
index 0000000..62aebb4
--- /dev/null
+++ b/src/carquinyol/indexstore.py
@@ -0,0 +1,238 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import logging
+import os
+
+import gobject
+import xapian
+from xapian import WritableDatabase, Document, Enquire, Query, QueryParser
+
+from carquinyol import layoutmanager
+from carquinyol.layoutmanager import MAX_QUERY_LIMIT
+
+_VALUE_UID = 0
+_VALUE_TIMESTAMP = 1
+
+_PREFIX_UID = 'Q'
+_PREFIX_ACTIVITY = 'A'
+_PREFIX_ACTIVITY_ID = 'I'
+_PREFIX_MIME_TYPE = 'M'
+_PREFIX_KEEP = 'K'
+
+# Force a flush every _n_ changes to the db
+_FLUSH_THRESHOLD = 20
+
+# Force a flush after _n_ seconds since the last change to the db
+_FLUSH_TIMEOUT = 60
+
+_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview']
+
+_MAX_RESULTS = int(2 ** 31 - 1)
+
+class IndexStore(object):
+ """Index metadata and provide rich query facilities on it.
+ """
+ def __init__(self):
+ self._database = None
+ self._flush_timeout = None
+ self._pending_writes = 0
+
+ def open_index(self):
+ index_path = layoutmanager.get_instance().get_index_path()
+ self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN)
+
+ def close_index(self):
+ self._database.flush()
+ self._database = None
+
+ def remove_index(self):
+ index_path = layoutmanager.get_instance().get_index_path()
+ if not os.path.exists(index_path):
+ return
+ for f in os.listdir(index_path):
+ os.remove(os.path.join(index_path, f))
+
+ def contains(self, uid):
+ postings = self._database.postlist(_PREFIX_UID + uid)
+ try:
+ postlist_item = postings.next()
+ except StopIteration:
+ return False
+ return True
+
+ def store(self, uid, properties):
+ document = Document()
+ document.add_term(_PREFIX_UID + uid)
+ document.add_term(_PREFIX_ACTIVITY + properties.get('activity', ''))
+ document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', ''))
+ document.add_term(_PREFIX_ACTIVITY_ID +
+ properties.get('activity_id', ''))
+ document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0)))
+
+ document.add_value(_VALUE_UID, uid)
+ document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp']))
+
+ term_generator = xapian.TermGenerator()
+
+ # TODO: we should do stemming, but in which language?
+ #if language is not None:
+ # term_generator.set_stemmer(_xapian.Stem(language))
+
+ # TODO: we should use a stopper
+ #if stop is not None:
+ # stopper = _xapian.SimpleStopper()
+ # for term in stop:
+ # stopper.add (term)
+ # term_generator.set_stopper (stopper)
+
+ term_generator.set_document(document)
+ term_generator.index_text_without_positions(
+ self._extract_text(properties), 1, '')
+
+ if not self.contains(uid):
+ self._database.add_document(document)
+ else:
+ self._database.replace_document(_PREFIX_UID + uid, document)
+ self._flush()
+
+ def _extract_text(self, properties):
+ text = ''
+ for key, value in properties.items():
+ if key not in _PROPERTIES_NOT_TO_INDEX:
+ if text:
+ text += ' '
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ elif not isinstance(value, basestring):
+ value = str(value)
+ text += value
+ return text
+
+ def find(self, query):
+ enquire = Enquire(self._database)
+ enquire.set_query(self._parse_query(query))
+
+ offset = query.get('offset', 0)
+ limit = query.get('limit', MAX_QUERY_LIMIT)
+
+ # This will assure that the results count is exact.
+ check_at_least = offset + limit + 1
+
+ enquire.set_sort_by_value(_VALUE_TIMESTAMP, True)
+
+ query_result = enquire.get_mset(offset, limit, check_at_least)
+ total_count = query_result.get_matches_estimated()
+
+ uids = []
+ for hit in query_result:
+ uids.append(hit.document.get_value(_VALUE_UID))
+
+ return (uids, total_count)
+
+ def _parse_query(self, query_dict):
+ logging.debug('_parse_query %r' % query_dict)
+ queries = []
+
+ query_str = query_dict.pop('query', None)
+ if query_str is not None:
+ query_parser = QueryParser()
+ query_parser.set_database(self._database)
+ #query_parser.set_default_op(Query.OP_AND)
+
+ # TODO: we should do stemming, but in which language?
+ #query_parser.set_stemmer(_xapian.Stem(lang))
+ #query_parser.set_stemming_strategy(qp.STEM_SOME)
+
+ query = query_parser.parse_query(
+ query_str,
+ QueryParser.FLAG_PHRASE |
+ QueryParser.FLAG_BOOLEAN |
+ QueryParser.FLAG_LOVEHATE |
+ QueryParser.FLAG_WILDCARD,
+ '')
+
+ queries.append(query)
+
+ timestamp = query_dict.pop('timestamp', None)
+ if timestamp is not None:
+ start = str(timestamp.pop('start', 0))
+ end = str(timestamp.pop('end', _MAX_RESULTS))
+ query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end)
+ queries.append(query)
+
+ uid = query_dict.pop('uid', None)
+ if uid is not None:
+ queries.append(Query(_PREFIX_UID + uid))
+
+ activity = query_dict.pop('activity', None)
+ if activity is not None:
+ queries.append(Query(_PREFIX_ACTIVITY + activity))
+
+ activity_id = query_dict.pop('activity_id', None)
+ if activity_id is not None:
+ query = Query(_PREFIX_ACTIVITY_ID + activity_id)
+ queries.append(query)
+
+ keep = query_dict.pop('keep', None)
+ if keep is not None:
+ query = Query(_PREFIX_KEEP + str(keep))
+ queries.append(query)
+
+ mime_type = query_dict.pop('mime_type', None)
+ if mime_type is not None:
+ mime_queries = []
+ for mime_type in mime_type:
+ mime_queries.append(Query(_PREFIX_MIME_TYPE + mime_type))
+ queries.append(Query(Query.OP_OR, mime_queries))
+
+ if not queries:
+ queries.append(Query(''))
+
+ if query_dict:
+ logging.warning('Unknown term(s): %r' % query_dict)
+
+ return Query(Query.OP_AND, queries)
+
+ def delete(self, uid):
+ self._database.delete_document(_PREFIX_UID + uid)
+
+ def get_activities(self):
+ activities = []
+ for term in self._database.allterms(_PREFIX_ACTIVITY):
+ activities.append(term.term[len(_PREFIX_ACTIVITY):])
+ return activities
+
+ def _flush_timeout_cb(self):
+ self._flush(True)
+ return False
+
+ def _flush(self, force=False):
+ """Called after any database mutation"""
+ logging.debug('IndexStore.flush: %r %r' % (force, self._pending_writes))
+
+ if self._flush_timeout is not None:
+ gobject.source_remove(self._flush_timeout)
+ self._flush_timeout = None
+
+ self._pending_writes += 1
+ if force or self._pending_writes > _FLUSH_THRESHOLD:
+ self._database.flush()
+ self._pending_writes = 0
+ else:
+ self._flush_timeout = gobject.timeout_add(_FLUSH_TIMEOUT * 1000,
+ self._flush_timeout_cb)
+
diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py
new file mode 100644
index 0000000..dc3fde6
--- /dev/null
+++ b/src/carquinyol/layoutmanager.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+
+MAX_QUERY_LIMIT = 40960
+
+class LayoutManager(object):
+ """Provide the logic about how entries are stored inside the datastore
+ directory
+ """
+ def __init__(self):
+ profile = os.environ.get('SUGAR_PROFILE', 'default')
+ base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile)
+
+ self._root_path = os.path.join(base_dir, 'datastore')
+
+ if not os.path.exists(self._root_path):
+ os.makedirs(self._root_path)
+ self.set_version(1)
+
+ self._create_if_needed(self.get_checksums_dir())
+ self._create_if_needed(self.get_queue_path())
+
+ index_updated_path = os.path.join(self._root_path, 'index_updated')
+ self._index_updated = os.path.exists(index_updated_path)
+
+ def _create_if_needed(self, path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+ def get_version(self):
+ version_path = os.path.join(self._root_path, 'version')
+ version = 0
+ if os.path.exists(version_path):
+ version = int(open(version_path, 'r').read())
+ return version
+
+ def set_version(self, version):
+ version_path = os.path.join(self._root_path, 'version')
+ open(version_path, 'w').write(str(version))
+
+ def get_entry_path(self, uid):
+ # os.path.join() is just too slow
+ return '%s/%s/%s' % (self._root_path, uid[:2], uid)
+
+ def get_root_path(self):
+ return self._root_path
+
+ def get_index_path(self):
+ return os.path.join(self._root_path, 'index')
+
+ def get_checksums_dir(self):
+ return os.path.join(self._root_path, 'checksums')
+
+ def get_queue_path(self):
+ return os.path.join(self.get_checksums_dir(), 'queue')
+
+ def _is_index_updated(self):
+ return self._index_updated
+
+ def _set_index_updated(self, index_updated):
+ if index_updated != self._index_updated:
+ self._index_updated = index_updated
+
+ index_updated_path = os.path.join(self._root_path, 'index_updated')
+ if os.path.exists(index_updated_path):
+ os.remove(index_updated_path)
+ else:
+ open(index_updated_path, 'w').close()
+
+ index_updated = property(_is_index_updated, _set_index_updated)
+
+ def find_all(self):
+ uids = []
+ for f in os.listdir(self._root_path):
+ if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2:
+ for g in os.listdir(os.path.join(self._root_path, f)):
+ if len(g) == 36:
+ uids.append(g)
+ return uids
+
+_instance = None
+def get_instance():
+ global _instance
+ if _instance is None:
+ _instance = LayoutManager()
+ return _instance
+
diff --git a/src/carquinyol/metadatareader.c b/src/carquinyol/metadatareader.c
new file mode 100644
index 0000000..08be17e
--- /dev/null
+++ b/src/carquinyol/metadatareader.c
@@ -0,0 +1,241 @@
+#include "Python.h"
+
+#include <dirent.h>
+
+// TODO: put it in a place where python can use it when writing metadata
+#define MAX_PROPERTY_LENGTH 500 * 1024
+
+static PyObject *byte_array_type = NULL;
+
+int
+add_property(char *metadata_path, char *property_name, PyObject *dict,
+ int must_exist)
+{
+ int file_path_size;
+ char *file_path = NULL;
+ FILE *file = NULL;
+ long file_size;
+ char *value_buf = NULL;
+ PyObject *value = NULL;
+ struct stat file_stat;
+
+ // Build path of the property file
+ file_path_size = strlen(metadata_path) + 1 + strlen(property_name) + 1;
+ file_path = PyMem_Malloc(file_path_size);
+ if (file_path == NULL) {
+ PyErr_NoMemory();
+ return 0;
+ }
+ snprintf (file_path, file_path_size, "%s/%s", metadata_path, property_name);
+
+ if ((!must_exist) && (stat(file_path, &file_stat) != 0)) {
+ PyMem_Free(file_path);
+ return;
+ }
+
+ file = fopen(file_path, "r");
+ if (file == NULL) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Cannot open property file %s: %s",
+ file_path, strerror(errno));
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ // Get file size
+ fseek (file, 0, SEEK_END);
+ file_size = ftell (file);
+ rewind (file);
+
+ if (file_size == 0) {
+ // Empty property
+ fclose(file);
+ file = NULL;
+
+ value = PyString_FromString("");
+ if (value == NULL) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to convert value to python string");
+ goto cleanup;
+ }
+ } else {
+ if (file_size > MAX_PROPERTY_LENGTH) {
+ PyErr_SetString(PyExc_ValueError, "Property file too big");
+ goto cleanup;
+ }
+
+ // Read the whole file
+ value_buf = PyMem_Malloc(file_size);
+ if (value_buf == NULL) {
+ PyErr_NoMemory();
+ goto cleanup;
+ }
+ long read_size = fread(value_buf, 1, file_size, file);
+ if (read_size < file_size) {
+ char buf[256];
+ snprintf(buf, sizeof(buf),
+ "Error while reading property file %s", file_path);
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ fclose(file);
+ file = NULL;
+
+ // Convert value to dbus.ByteArray
+ PyObject *args = Py_BuildValue("(s#)", value_buf, file_size);
+
+ PyMem_Free(value_buf);
+ value_buf = NULL;
+
+ value = PyObject_CallObject(byte_array_type, args);
+ if (value == NULL) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to convert value to dbus.ByteArray");
+ goto cleanup;
+ }
+ }
+
+ // Add property to the metadata dict
+ if (PyDict_SetItemString(dict, property_name, value) == -1) {
+ PyErr_SetString(PyExc_ValueError,
+ "Failed to add property to dictionary");
+ goto cleanup;
+ }
+
+ Py_DECREF(value);
+ PyMem_Free(file_path);
+
+ return 1;
+
+cleanup:
+ if (file_path) {
+ PyMem_Free(file_path);
+ }
+ if (value_buf) {
+ PyMem_Free(value_buf);
+ }
+ if (file) {
+ fclose(file);
+ }
+ if (value) {
+ Py_DECREF(value);
+ }
+ return 0;
+}
+
+static PyObject *
+read_from_properties_list (char *metadata_path, PyObject *properties)
+{
+ PyObject *dict = PyDict_New();
+
+ int i;
+ for (i = 0; i < PyList_Size(properties); i++) {
+ PyObject *property = PyList_GetItem(properties, i);
+ char *property_name = PyString_AsString (property);
+
+ if (add_property(metadata_path, property_name, dict, 0) == 0)
+ goto cleanup;
+ }
+
+ return dict;
+
+cleanup:
+ if (dict) {
+ Py_DECREF(dict);
+ }
+ return NULL;
+}
+
+static PyObject *
+read_all_properties (char *metadata_path)
+{
+ PyObject *dict = PyDict_New();
+ DIR *dir_stream = NULL;
+ struct dirent *dir_entry = NULL;
+
+ dir_stream = opendir (metadata_path);
+ if (dir_stream == NULL) {
+ char buf[256];
+ snprintf(buf, sizeof(buf), "Couldn't open metadata directory %s",
+ metadata_path);
+ PyErr_SetString(PyExc_IOError, buf);
+ goto cleanup;
+ }
+
+ dir_entry = readdir(dir_stream);
+ while (dir_entry != NULL) {
+ // Skip . and ..
+ if (dir_entry->d_name[0] == '.' &&
+ (strlen(dir_entry->d_name) == 1 ||
+ (dir_entry->d_name[1] == '.' &&
+ strlen(dir_entry->d_name) == 2)))
+ goto next_property;
+
+ if (add_property(metadata_path, dir_entry->d_name, dict, 1) == 0)
+ goto cleanup;
+
+ next_property:
+ dir_entry = readdir(dir_stream);
+ }
+
+ closedir(dir_stream);
+
+ return dict;
+
+cleanup:
+ if (dict) {
+ Py_DECREF(dict);
+ }
+ if (dir_stream) {
+ closedir(dir_stream);
+ }
+ return NULL;
+}
+
+static PyObject *
+metadatareader_retrieve(PyObject *unused, PyObject *args)
+{
+ PyObject *dict = NULL;
+ PyObject *properties = NULL;
+ const char *dir_path = NULL;
+ char *metadata_path = NULL;
+
+ if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties))
+ return NULL;
+
+ // Build path to the metadata directory
+ int metadata_path_size = strlen(dir_path) + 10;
+ metadata_path = PyMem_Malloc(metadata_path_size);
+ if (metadata_path == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata");
+
+ if ((properties != Py_None) && (PyList_Size(properties) > 0)) {
+ dict = read_from_properties_list(metadata_path, properties);
+ } else {
+ dict = read_all_properties(metadata_path);
+ }
+
+ PyMem_Free(metadata_path);
+
+ return dict;
+}
+
+static PyMethodDef metadatareader_functions[] = {
+ {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")},
+ {NULL, NULL, 0, NULL}
+};
+
+PyMODINIT_FUNC
+initmetadatareader(void)
+{
+ PyObject* mod;
+ mod = Py_InitModule("metadatareader", metadatareader_functions);
+
+ PyObject *dbus_module = PyImport_ImportModule("dbus");
+ byte_array_type = PyObject_GetAttrString(dbus_module, "ByteArray");
+}
+
diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py
new file mode 100644
index 0000000..50981f3
--- /dev/null
+++ b/src/carquinyol/metadatastore.py
@@ -0,0 +1,64 @@
+import os
+
+from carquinyol import layoutmanager
+from carquinyol import metadatareader
+
+MAX_SIZE = 256
+
+class MetadataStore(object):
+ def store(self, uid, metadata):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ if not os.path.exists(dir_path):
+ os.makedirs(dir_path)
+
+ metadata_path = os.path.join(dir_path, 'metadata')
+ if not os.path.exists(metadata_path):
+ os.makedirs(metadata_path)
+ else:
+ for key in os.listdir(metadata_path):
+ os.remove(os.path.join(metadata_path, key))
+
+ metadata['uid'] = uid
+ for key, value in metadata.items():
+
+ # Hack to support activities that still pass properties named as for
+ # example title:text.
+ if ':' in key:
+ key = key.split(':', 1)[0]
+
+ f = open(os.path.join(metadata_path, key), 'w')
+ try:
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ elif not isinstance(value, basestring):
+ value = str(value)
+ f.write(value)
+ finally:
+ f.close()
+
+ def retrieve(self, uid, properties=None):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ return metadatareader.retrieve(dir_path, properties)
+
+ def delete(self, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ for key in os.listdir(metadata_path):
+ os.remove(os.path.join(metadata_path, key))
+ os.rmdir(metadata_path)
+
+ def get_property(self, uid, key):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ if os.path.exists(property_path):
+ return open(property_path, 'r').read()
+ else:
+ return None
+
+ def set_property(self, uid, key, value):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ property_path = os.path.join(metadata_path, key)
+ open(property_path, 'w').write(value)
+
diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py
new file mode 100644
index 0000000..228db2a
--- /dev/null
+++ b/src/carquinyol/migration.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Transform one DataStore directory in a newer format.
+"""
+
+import os
+import logging
+import shutil
+import time
+import traceback
+
+import cjson
+
+from carquinyol import layoutmanager
+
+DATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
+
+def migrate_from_0():
+ logging.info('Migrating datastore from version 0 to version 1')
+
+ root_path = layoutmanager.get_instance().get_root_path()
+ old_root_path = os.path.join(root_path, 'store')
+ if not os.path.exists(old_root_path):
+ return
+
+ for f in os.listdir(old_root_path):
+ uid, ext = os.path.splitext(f)
+ if ext != '.metadata':
+ continue
+
+ logging.debug('Migrating entry %r' % uid)
+ try:
+ _migrate_metadata(root_path, old_root_path, uid)
+ _migrate_file(root_path, old_root_path, uid)
+ _migrate_preview(root_path, old_root_path, uid)
+ except Exception:
+ logging.error('Error while migrating entry %r: %s\n' % \
+ (uid, traceback.format_exc()))
+
+ # Just be paranoid, it's cheap.
+ if old_root_path.endswith('datastore/store'):
+ shutil.rmtree(old_root_path)
+
+ logging.info('Migration finished')
+
+def _migrate_metadata(root_path, old_root_path, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ os.makedirs(metadata_path)
+
+ old_metadata_path = os.path.join(old_root_path, uid + '.metadata')
+ metadata = cjson.decode(open(old_metadata_path, 'r').read())
+
+ if 'uid' not in metadata:
+ metadata['uid'] = uid
+
+ if 'timestamp' not in metadata and 'mtime' in metadata:
+ metadata['timestamp'] = \
+ time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT))
+
+ for key, value in metadata.items():
+ try:
+ f = open(os.path.join(metadata_path, key), 'w')
+ try:
+ if isinstance(value, unicode):
+ value = value.encode('utf-8')
+ if not isinstance(value, basestring):
+ value = str(value)
+ f.write(value)
+ finally:
+ f.close()
+ except Exception:
+ logging.error(
+ 'Error while migrating property %s of entry %s: %s\n' % \
+ (key, uid, traceback.format_exc()))
+
+def _migrate_file(root_path, old_root_path, uid):
+ if os.path.exists(os.path.join(old_root_path, uid)):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ os.rename(os.path.join(old_root_path, uid),
+ os.path.join(dir_path, 'data'))
+
+def _migrate_preview(root_path, old_root_path, uid):
+ dir_path = layoutmanager.get_instance().get_entry_path(uid)
+ metadata_path = os.path.join(dir_path, 'metadata')
+ os.rename(os.path.join(old_root_path, 'preview', uid),
+ os.path.join(metadata_path, 'preview'))
+
diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py
new file mode 100644
index 0000000..f8a2e3e
--- /dev/null
+++ b/src/carquinyol/optimizer.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2008, One Laptop Per Child
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import errno
+import logging
+import subprocess
+
+import gobject
+
+from carquinyol import layoutmanager
+
+class Optimizer(object):
+ """Optimizes disk space usage by detecting duplicates and sharing storage.
+ """
+ def __init__(self, file_store, metadata_store):
+ self._file_store = file_store
+ self._metadata_store = metadata_store
+ self._enqueue_checksum_id = None
+
+ def optimize(self, uid):
+ """Add an entry to a queue of entries to be checked for duplicates.
+
+ """
+ if not os.path.exists(self._file_store.get_file_path(uid)):
+ return
+
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ open(os.path.join(queue_path, uid), 'w').close()
+ logging.debug('optimize %r' % os.path.join(queue_path, uid))
+
+ if self._enqueue_checksum_id is None:
+ self._enqueue_checksum_id = \
+ gobject.idle_add(self._process_entry_cb,
+ priority=gobject.PRIORITY_LOW)
+
+ def remove(self, uid):
+ """Remove any structures left from space optimization
+
+ """
+ checksum = self._metadata_store.get_property(uid, 'checksum')
+ if checksum is None:
+ return
+
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ checksum_entry_path = os.path.join(checksum_path, uid)
+
+ if os.path.exists(checksum_entry_path):
+ logging.debug('remove %r' % checksum_entry_path)
+ os.remove(checksum_entry_path)
+
+ if os.path.exists(checksum_path):
+ try:
+ os.rmdir(checksum_path)
+ logging.debug('removed %r' % checksum_path)
+ except OSError, e:
+ if e.errno != errno.ENOTEMPTY:
+ raise
+
+ def _identical_file_already_exists(self, checksum):
+ """Check if we already have files with this checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(checksum_path)
+
+ def _get_uid_from_checksum(self, checksum):
+ """Get an existing entry which file matches checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ first_uid = os.listdir(checksum_path)[0]
+ return first_uid
+
+ def _create_checksum_dir(self, checksum):
+ """Create directory that tracks files with this same checksum.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ logging.debug('create dir %r' % checksum_path)
+ os.mkdir(checksum_path)
+
+ def _add_checksum_entry(self, uid, checksum):
+ """Create a file in the checksum dir with the uid of the entry
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+
+ logging.debug('touch %r' % os.path.join(checksum_path, uid))
+ open(os.path.join(checksum_path, uid), 'w').close()
+
+ def _already_linked(self, uid, checksum):
+ """Check if this entry's file is already a hard link to the checksums
+ dir.
+
+ """
+ checksums_dir = layoutmanager.get_instance().get_checksums_dir()
+ checksum_path = os.path.join(checksums_dir, checksum)
+ return os.path.exists(os.path.join(checksum_path, uid))
+
+ def _process_entry_cb(self):
+ """Process one item in the checksums queue by calculating its checksum,
+ checking if there exist already an identical file, and in that case
+ substituting its file with a hard link to that pre-existing file.
+
+ """
+ queue_path = layoutmanager.get_instance().get_queue_path()
+ queue = os.listdir(queue_path)
+ if queue:
+ uid = queue[0]
+ logging.debug('_process_entry_cb processing %r' % uid)
+
+ file_in_entry_path = self._file_store.get_file_path(uid)
+ if not os.path.exists(file_in_entry_path):
+ logging.info('non-existent entry in queue: %r' % uid)
+ else:
+ checksum = self._calculate_md5sum(file_in_entry_path)
+ self._metadata_store.set_property(uid, 'checksum', checksum)
+
+ if self._identical_file_already_exists(checksum):
+ if not self._already_linked(uid, checksum):
+ existing_entry_uid = \
+ self._get_uid_from_checksum(checksum)
+
+ self._file_store.hard_link_entry(uid,
+ existing_entry_uid)
+
+ self._add_checksum_entry(uid, checksum)
+ else:
+ self._create_checksum_dir(checksum)
+ self._add_checksum_entry(uid, checksum)
+
+ os.remove(os.path.join(queue_path, uid))
+
+ if len(queue) <= 1:
+ self._enqueue_checksum_id = None
+ return False
+ else:
+ return True
+
+ def _calculate_md5sum(self, path):
+ """Calculate the md5 checksum of a given file.
+
+ """
+ popen = subprocess.Popen(['md5sum', path], stdout=subprocess.PIPE)
+ stdout, stderr_ = popen.communicate()
+ return stdout.split(' ', 1)[0]
+