diff options
Diffstat (limited to 'src/carquinyol')
-rw-r--r-- | src/carquinyol/Makefile.am | 23 | ||||
-rw-r--r-- | src/carquinyol/__init__.py | 1 | ||||
-rw-r--r-- | src/carquinyol/datastore.py | 315 | ||||
-rw-r--r-- | src/carquinyol/filestore.py | 230 | ||||
-rw-r--r-- | src/carquinyol/indexstore.py | 238 | ||||
-rw-r--r-- | src/carquinyol/layoutmanager.py | 102 | ||||
-rw-r--r-- | src/carquinyol/metadatareader.c | 241 | ||||
-rw-r--r-- | src/carquinyol/metadatastore.py | 64 | ||||
-rw-r--r-- | src/carquinyol/migration.py | 102 | ||||
-rw-r--r-- | src/carquinyol/optimizer.py | 166 |
10 files changed, 1482 insertions, 0 deletions
diff --git a/src/carquinyol/Makefile.am b/src/carquinyol/Makefile.am new file mode 100644 index 0000000..7c56174 --- /dev/null +++ b/src/carquinyol/Makefile.am @@ -0,0 +1,23 @@ +datastoredir = $(pythondir)/carquinyol +datastore_PYTHON = \ + __init__.py \ + datastore.py \ + filestore.py \ + indexstore.py \ + layoutmanager.py \ + metadatastore.py \ + migration.py \ + optimizer.py + +AM_CPPFLAGS = \ + $(WARN_CFLAGS) \ + $(EXT_CFLAGS) \ + $(PYTHON_INCLUDES) + +AM_LDFLAGS = -module -avoid-version + +pkgpyexecdir = $(pythondir)/carquinyol +pkgpyexec_LTLIBRARIES = metadatareader.la + +metadatareader_la_SOURCES = \ + metadatareader.c diff --git a/src/carquinyol/__init__.py b/src/carquinyol/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/carquinyol/__init__.py @@ -0,0 +1 @@ + diff --git a/src/carquinyol/datastore.py b/src/carquinyol/datastore.py new file mode 100644 index 0000000..a118e03 --- /dev/null +++ b/src/carquinyol/datastore.py @@ -0,0 +1,315 @@ +# Copyright (C) 2008, One Laptop Per Child +# Based on code Copyright (C) 2007, ObjectRealms, LLC +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import uuid +import time +import os +import traceback + +import dbus +import gobject + +from sugar import mime + +from carquinyol import layoutmanager +from carquinyol import migration +from carquinyol.layoutmanager import MAX_QUERY_LIMIT +from carquinyol.metadatastore import MetadataStore +from carquinyol.indexstore import IndexStore +from carquinyol.filestore import FileStore +from carquinyol.optimizer import Optimizer + +# the name used by the logger +DS_LOG_CHANNEL = 'org.laptop.sugar.DataStore' + +DS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE = "org.laptop.sugar.DataStore" +DS_OBJECT_PATH = "/org/laptop/sugar/DataStore" + +logger = logging.getLogger(DS_LOG_CHANNEL) + +class DataStore(dbus.service.Object): + """D-Bus API and logic for connecting all the other components. + """ + def __init__(self, **options): + bus_name = dbus.service.BusName(DS_SERVICE, + bus=dbus.SessionBus(), + replace_existing=False, + allow_replacement=False) + dbus.service.Object.__init__(self, bus_name, DS_OBJECT_PATH) + + layout_manager = layoutmanager.get_instance() + if layout_manager.get_version() == 0: + migration.migrate_from_0() + layout_manager.set_version(1) + layout_manager.index_updated = False + + self._metadata_store = MetadataStore() + + self._index_store = IndexStore() + try: + self._index_store.open_index() + except Exception: + logging.error('Failed to open index, will rebuild\n%s' \ + % traceback.format_exc()) + layout_manager.index_updated = False + self._index_store.remove_index() + self._index_store.open_index() + + self._file_store = FileStore() + + if not layout_manager.index_updated: + logging.debug('Index is not up-to-date, will update') + self._rebuild_index() + + self._optimizer = Optimizer(self._file_store, self._metadata_store) + + def _rebuild_index(self): + uids = layoutmanager.get_instance().find_all() + logging.debug('Going to update the index with uids %r' % uids) + gobject.idle_add(lambda: self.__rebuild_index_cb(uids), + priority=gobject.PRIORITY_LOW) + + def __rebuild_index_cb(self, uids): + if uids: + uid = uids.pop() + + logging.debug('Updating entry %r in index. %d to go.' % \ + (uid, len(uids))) + + if not self._index_store.contains(uid): + try: + props = self._metadata_store.retrieve(uid) + self._index_store.store(uid, props) + except Exception: + logging.error('Error processing %r\n%s.' \ + % (uid, traceback.format_exc())) + + if not uids: + logging.debug('Finished updating index.') + layoutmanager.get_instance().index_updated = True + return False + else: + return True + + def _create_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug("_create_completion_cb(%r, %r, %r, %r)" % \ + (async_cb, async_err_cb, uid, exc)) + if exc is not None: + async_err_cb(exc) + return + + self.Created(uid) + self._optimizer.optimize(uid) + logger.debug("created %s" % uid) + async_cb(uid) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}sb', + out_signature='s', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def create(self, props, file_path, transfer_ownership, + async_cb, async_err_cb): + uid = str(uuid.uuid4()) + logging.debug('datastore.create %r' % uid) + + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._create_completion_cb(async_cb, + async_err_cb, + uid, + *args)) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Created(self, uid): + pass + + def _update_completion_cb(self, async_cb, async_err_cb, uid, exc=None): + logger.debug("_update_completion_cb() called with %r / %r, exc %r" % \ + (async_cb, async_err_cb, exc)) + if exc is not None: + async_err_cb(exc) + return + + self.Updated(uid) + self._optimizer.optimize(uid) + logger.debug("updated %s" % uid) + async_cb() + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}sb', + out_signature='', + async_callbacks=('async_cb', 'async_err_cb'), + byte_arrays=True) + def update(self, uid, props, file_path, transfer_ownership, + async_cb, async_err_cb): + logging.debug('datastore.update %r' % uid) + + if not props.get('timestamp', ''): + props['timestamp'] = int(time.time()) + + self._metadata_store.store(uid, props) + self._index_store.store(uid, props) + + if os.path.exists(self._file_store.get_file_path(uid)) and \ + (not file_path or os.path.exists(file_path)): + self._optimizer.remove(uid) + self._file_store.store(uid, file_path, transfer_ownership, + lambda *args: self._update_completion_cb(async_cb, + async_err_cb, + uid, + *args)) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Updated(self, uid): + pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='a{sv}as', + out_signature='aa{sv}u') + def find(self, query, properties): + logging.debug('datastore.find %r' % query) + t = time.time() + + if layoutmanager.get_instance().index_updated: + try: + uids, count = self._index_store.find(query) + except Exception: + logging.error('Failed to query index, will rebuild\n%s' \ + % traceback.format_exc()) + layoutmanager.get_instance().index_updated = False + self._index_store.close_index() + self._index_store.remove_index() + self._index_store.open_index() + self._rebuild_index() + + if not layoutmanager.get_instance().index_updated: + logging.warning('Index updating, returning all entries') + + uids = layoutmanager.get_instance().find_all() + count = len(uids) + + offset = query.get('offset', 0) + limit = query.get('limit', MAX_QUERY_LIMIT) + uids = uids[offset:offset + limit] + + entries = [] + for uid in uids: + metadata = self._metadata_store.retrieve(uid, properties) + entries.append(metadata) + logger.debug('find(): %r' % (time.time() - t)) + return entries, count + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='s', + sender_keyword='sender') + def get_filename(self, uid, sender=None): + logging.debug('datastore.get_filename %r' % uid) + user_id = dbus.Bus().get_unix_user(sender) + extension = self._get_extension(uid) + return self._file_store.retrieve(uid, user_id, extension) + + def _get_extension(self, uid): + mime_type = self._metadata_store.get_property(uid, 'mime_type') + if mime_type is None or not mime_type: + return '' + return mime.get_primary_extension(mime_type) + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='a{sv}') + def get_properties(self, uid): + logging.debug('datastore.get_properties %r' % uid) + metadata = self._metadata_store.retrieve(uid) + return metadata + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='sa{sv}', + out_signature='as') + def get_uniquevaluesfor(self, propertyname, query=None): + if propertyname != 'activity': + raise ValueError('Only ''activity'' is a supported property name') + if query: + raise ValueError('The query parameter is not supported') + if layoutmanager.get_instance().index_updated: + return self._index_store.get_activities() + else: + logging.warning('Index updating, returning an empty list') + return [] + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature='s', + out_signature='') + def delete(self, uid): + self._optimizer.remove(uid) + + self._index_store.delete(uid) + self._file_store.delete(uid) + self._metadata_store.delete(uid) + + entry_path = layoutmanager.get_instance().get_entry_path(uid) + os.removedirs(entry_path) + + self.Deleted(uid) + logger.debug("deleted %s" % uid) + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="s") + def Deleted(self, uid): + pass + + def stop(self): + """shutdown the service""" + self._index_store.close_index() + self.Stopped() + + @dbus.service.signal(DS_DBUS_INTERFACE) + def Stopped(self): + pass + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="sa{sv}", + out_signature='s') + def mount(self, uri, options=None): + return '' + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="", + out_signature="aa{sv}") + def mounts(self): + return [{'id': 1}] + + @dbus.service.method(DS_DBUS_INTERFACE, + in_signature="s", + out_signature="") + def unmount(self, mountpoint_id): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Mounted(self, descriptior): + pass + + @dbus.service.signal(DS_DBUS_INTERFACE, signature="a{sv}") + def Unmounted(self, descriptor): + pass + diff --git a/src/carquinyol/filestore.py b/src/carquinyol/filestore.py new file mode 100644 index 0000000..f88c531 --- /dev/null +++ b/src/carquinyol/filestore.py @@ -0,0 +1,230 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import errno +import logging +import tempfile + +import gobject + +from carquinyol import layoutmanager + +class FileStore(object): + """Handle the storage of one file per entry. + """ + # TODO: add protection against store and retrieve operations on entries + # that are being processed async. + + def store(self, uid, file_path, transfer_ownership, completion_cb): + """Store a file for a given entry. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + destination_path = os.path.join(dir_path, 'data') + if file_path: + if not os.path.isfile(file_path): + raise ValueError('No file at %r' % file_path) + if transfer_ownership: + try: + logging.debug('FileStore moving from %r to %r' % \ + (file_path, destination_path)) + os.rename(file_path, destination_path) + completion_cb() + except OSError, e: + if e.errno == errno.EXDEV: + self._async_copy(file_path, destination_path, + completion_cb) + else: + raise + else: + self._async_copy(file_path, destination_path, completion_cb) + """ + TODO: How can we support deleting the file of an entry? + elif not file_path and os.path.exists(destination_path): + logging.debug('FileStore: deleting %r' % destination_path) + os.remove(destination_path) + completion_cb() + """ + else: + logging.debug('FileStore: Nothing to do') + completion_cb() + + def _async_copy(self, file_path, destination_path, completion_cb): + """Start copying a file asynchronously. + + """ + logging.debug('FileStore copying from %r to %r' % \ + (file_path, destination_path)) + async_copy = AsyncCopy(file_path, destination_path, completion_cb) + async_copy.start() + + def retrieve(self, uid, user_id, extension): + """Place the file associated to a given entry into a directory where the + user can read it. The caller is reponsible for deleting this file. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + file_path = os.path.join(dir_path, 'data') + if not os.path.exists(file_path): + logging.debug('Entry %r doesnt have any file' % uid) + return '' + + use_instance_dir = os.path.exists('/etc/olpc-security') and \ + os.getuid() != user_id + if use_instance_dir: + if not user_id: + raise ValueError('Couldnt determine the current user uid.') + destination_dir = os.path.join(os.environ['HOME'], 'isolation', '1', + 'uid_to_instance_dir', str(user_id)) + else: + profile = os.environ.get('SUGAR_PROFILE', 'default') + destination_dir = os.path.join(os.path.expanduser('~'), '.sugar', + profile, 'data') + if not os.path.exists(destination_dir): + os.makedirs(destination_dir) + + if extension is None: + extension = '' + elif extension: + extension = '.' + extension + + destination_path = os.path.join(destination_dir, uid + extension) + + attempt = 1 + while os.path.exists(destination_path): + if attempt > 10: + fd_, destination_path = tempfile.mkstemp(prefix=uid, + suffix=extension, + dir=destination_dir) + del fd_ + os.unlink(destination_path) + break + else: + file_name = '%s_%s%s' % (uid, attempt, extension) + destination_path = os.path.join(destination_dir, file_name) + attempt += 1 + + # Try to hard link from the original file to the targetpath. This can + # fail if the file is in a different filesystem. Do a symlink instead. + try: + os.link(file_path, destination_path) + except OSError, e: + if e.errno == errno.EXDEV: + os.symlink(file_path, destination_path) + else: + raise + + # Try to make the original file readable. This can fail if the file is + # in a FAT filesystem. + try: + os.chmod(file_path, 0604) + except OSError, e: + if e.errno != errno.EPERM: + raise + + return destination_path + + def get_file_path(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + return os.path.join(dir_path, 'data') + + def delete(self, uid): + """Remove the file associated to a given entry. + + """ + dir_path = layoutmanager.get_instance().get_entry_path(uid) + file_path = os.path.join(dir_path, 'data') + if os.path.exists(file_path): + os.remove(file_path) + + def hard_link_entry(self, new_uid, existing_uid): + existing_file = os.path.join( + layoutmanager.get_instance().get_entry_path(existing_uid), + 'data') + new_file = os.path.join( + layoutmanager.get_instance().get_entry_path(new_uid), + 'data') + + logging.debug('removing %r' % new_file) + os.remove(new_file) + + logging.debug('hard linking %r -> %r' % (new_file, existing_file)) + os.link(existing_file, new_file) + +class AsyncCopy(object): + """Copy a file in chunks in the idle loop. + + """ + CHUNK_SIZE = 65536 + + def __init__(self, src, dest, completion): + self.src = src + self.dest = dest + self.completion = completion + self.src_fp = -1 + self.dest_fp = -1 + self.written = 0 + self.size = 0 + + def _cleanup(self): + os.close(self.src_fp) + os.close(self.dest_fp) + + def _copy_block(self, user_data=None): + try: + data = os.read(self.src_fp, AsyncCopy.CHUNK_SIZE) + count = os.write(self.dest_fp, data) + self.written += len(data) + + # error writing data to file? + if count < len(data): + logging.error('AC: Error writing %s -> %s: wrote less than ' + 'expected' % (self.src, self.dest)) + self._cleanup() + self.completion(RuntimeError( + 'Error writing data to destination file')) + return False + + # FIXME: emit progress here + + # done? + if len(data) < AsyncCopy.CHUNK_SIZE: + self._cleanup() + self.completion(None) + return False + except Exception, err: + logging.error("AC: Error copying %s -> %s: %r" % \ + (self.src, self.dest, err)) + self._cleanup() + self.completion(err) + return False + + return True + + def start(self): + self.src_fp = os.open(self.src, os.O_RDONLY) + self.dest_fp = os.open(self.dest, os.O_RDWR | os.O_TRUNC | os.O_CREAT, + 0644) + + stat = os.fstat(self.src_fp) + self.size = stat[6] + + gobject.idle_add(self._copy_block) + diff --git a/src/carquinyol/indexstore.py b/src/carquinyol/indexstore.py new file mode 100644 index 0000000..62aebb4 --- /dev/null +++ b/src/carquinyol/indexstore.py @@ -0,0 +1,238 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import os + +import gobject +import xapian +from xapian import WritableDatabase, Document, Enquire, Query, QueryParser + +from carquinyol import layoutmanager +from carquinyol.layoutmanager import MAX_QUERY_LIMIT + +_VALUE_UID = 0 +_VALUE_TIMESTAMP = 1 + +_PREFIX_UID = 'Q' +_PREFIX_ACTIVITY = 'A' +_PREFIX_ACTIVITY_ID = 'I' +_PREFIX_MIME_TYPE = 'M' +_PREFIX_KEEP = 'K' + +# Force a flush every _n_ changes to the db +_FLUSH_THRESHOLD = 20 + +# Force a flush after _n_ seconds since the last change to the db +_FLUSH_TIMEOUT = 60 + +_PROPERTIES_NOT_TO_INDEX = ['timestamp', 'activity_id', 'keep', 'preview'] + +_MAX_RESULTS = int(2 ** 31 - 1) + +class IndexStore(object): + """Index metadata and provide rich query facilities on it. + """ + def __init__(self): + self._database = None + self._flush_timeout = None + self._pending_writes = 0 + + def open_index(self): + index_path = layoutmanager.get_instance().get_index_path() + self._database = WritableDatabase(index_path, xapian.DB_CREATE_OR_OPEN) + + def close_index(self): + self._database.flush() + self._database = None + + def remove_index(self): + index_path = layoutmanager.get_instance().get_index_path() + if not os.path.exists(index_path): + return + for f in os.listdir(index_path): + os.remove(os.path.join(index_path, f)) + + def contains(self, uid): + postings = self._database.postlist(_PREFIX_UID + uid) + try: + postlist_item = postings.next() + except StopIteration: + return False + return True + + def store(self, uid, properties): + document = Document() + document.add_term(_PREFIX_UID + uid) + document.add_term(_PREFIX_ACTIVITY + properties.get('activity', '')) + document.add_term(_PREFIX_MIME_TYPE + properties.get('mime_type', '')) + document.add_term(_PREFIX_ACTIVITY_ID + + properties.get('activity_id', '')) + document.add_term(_PREFIX_KEEP + str(properties.get('keep', 0))) + + document.add_value(_VALUE_UID, uid) + document.add_value(_VALUE_TIMESTAMP, str(properties['timestamp'])) + + term_generator = xapian.TermGenerator() + + # TODO: we should do stemming, but in which language? + #if language is not None: + # term_generator.set_stemmer(_xapian.Stem(language)) + + # TODO: we should use a stopper + #if stop is not None: + # stopper = _xapian.SimpleStopper() + # for term in stop: + # stopper.add (term) + # term_generator.set_stopper (stopper) + + term_generator.set_document(document) + term_generator.index_text_without_positions( + self._extract_text(properties), 1, '') + + if not self.contains(uid): + self._database.add_document(document) + else: + self._database.replace_document(_PREFIX_UID + uid, document) + self._flush() + + def _extract_text(self, properties): + text = '' + for key, value in properties.items(): + if key not in _PROPERTIES_NOT_TO_INDEX: + if text: + text += ' ' + if isinstance(value, unicode): + value = value.encode('utf-8') + elif not isinstance(value, basestring): + value = str(value) + text += value + return text + + def find(self, query): + enquire = Enquire(self._database) + enquire.set_query(self._parse_query(query)) + + offset = query.get('offset', 0) + limit = query.get('limit', MAX_QUERY_LIMIT) + + # This will assure that the results count is exact. + check_at_least = offset + limit + 1 + + enquire.set_sort_by_value(_VALUE_TIMESTAMP, True) + + query_result = enquire.get_mset(offset, limit, check_at_least) + total_count = query_result.get_matches_estimated() + + uids = [] + for hit in query_result: + uids.append(hit.document.get_value(_VALUE_UID)) + + return (uids, total_count) + + def _parse_query(self, query_dict): + logging.debug('_parse_query %r' % query_dict) + queries = [] + + query_str = query_dict.pop('query', None) + if query_str is not None: + query_parser = QueryParser() + query_parser.set_database(self._database) + #query_parser.set_default_op(Query.OP_AND) + + # TODO: we should do stemming, but in which language? + #query_parser.set_stemmer(_xapian.Stem(lang)) + #query_parser.set_stemming_strategy(qp.STEM_SOME) + + query = query_parser.parse_query( + query_str, + QueryParser.FLAG_PHRASE | + QueryParser.FLAG_BOOLEAN | + QueryParser.FLAG_LOVEHATE | + QueryParser.FLAG_WILDCARD, + '') + + queries.append(query) + + timestamp = query_dict.pop('timestamp', None) + if timestamp is not None: + start = str(timestamp.pop('start', 0)) + end = str(timestamp.pop('end', _MAX_RESULTS)) + query = Query(Query.OP_VALUE_RANGE, _VALUE_TIMESTAMP, start, end) + queries.append(query) + + uid = query_dict.pop('uid', None) + if uid is not None: + queries.append(Query(_PREFIX_UID + uid)) + + activity = query_dict.pop('activity', None) + if activity is not None: + queries.append(Query(_PREFIX_ACTIVITY + activity)) + + activity_id = query_dict.pop('activity_id', None) + if activity_id is not None: + query = Query(_PREFIX_ACTIVITY_ID + activity_id) + queries.append(query) + + keep = query_dict.pop('keep', None) + if keep is not None: + query = Query(_PREFIX_KEEP + str(keep)) + queries.append(query) + + mime_type = query_dict.pop('mime_type', None) + if mime_type is not None: + mime_queries = [] + for mime_type in mime_type: + mime_queries.append(Query(_PREFIX_MIME_TYPE + mime_type)) + queries.append(Query(Query.OP_OR, mime_queries)) + + if not queries: + queries.append(Query('')) + + if query_dict: + logging.warning('Unknown term(s): %r' % query_dict) + + return Query(Query.OP_AND, queries) + + def delete(self, uid): + self._database.delete_document(_PREFIX_UID + uid) + + def get_activities(self): + activities = [] + for term in self._database.allterms(_PREFIX_ACTIVITY): + activities.append(term.term[len(_PREFIX_ACTIVITY):]) + return activities + + def _flush_timeout_cb(self): + self._flush(True) + return False + + def _flush(self, force=False): + """Called after any database mutation""" + logging.debug('IndexStore.flush: %r %r' % (force, self._pending_writes)) + + if self._flush_timeout is not None: + gobject.source_remove(self._flush_timeout) + self._flush_timeout = None + + self._pending_writes += 1 + if force or self._pending_writes > _FLUSH_THRESHOLD: + self._database.flush() + self._pending_writes = 0 + else: + self._flush_timeout = gobject.timeout_add(_FLUSH_TIMEOUT * 1000, + self._flush_timeout_cb) + diff --git a/src/carquinyol/layoutmanager.py b/src/carquinyol/layoutmanager.py new file mode 100644 index 0000000..dc3fde6 --- /dev/null +++ b/src/carquinyol/layoutmanager.py @@ -0,0 +1,102 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os + +MAX_QUERY_LIMIT = 40960 + +class LayoutManager(object): + """Provide the logic about how entries are stored inside the datastore + directory + """ + def __init__(self): + profile = os.environ.get('SUGAR_PROFILE', 'default') + base_dir = os.path.join(os.path.expanduser('~'), '.sugar', profile) + + self._root_path = os.path.join(base_dir, 'datastore') + + if not os.path.exists(self._root_path): + os.makedirs(self._root_path) + self.set_version(1) + + self._create_if_needed(self.get_checksums_dir()) + self._create_if_needed(self.get_queue_path()) + + index_updated_path = os.path.join(self._root_path, 'index_updated') + self._index_updated = os.path.exists(index_updated_path) + + def _create_if_needed(self, path): + if not os.path.exists(path): + os.makedirs(path) + + def get_version(self): + version_path = os.path.join(self._root_path, 'version') + version = 0 + if os.path.exists(version_path): + version = int(open(version_path, 'r').read()) + return version + + def set_version(self, version): + version_path = os.path.join(self._root_path, 'version') + open(version_path, 'w').write(str(version)) + + def get_entry_path(self, uid): + # os.path.join() is just too slow + return '%s/%s/%s' % (self._root_path, uid[:2], uid) + + def get_root_path(self): + return self._root_path + + def get_index_path(self): + return os.path.join(self._root_path, 'index') + + def get_checksums_dir(self): + return os.path.join(self._root_path, 'checksums') + + def get_queue_path(self): + return os.path.join(self.get_checksums_dir(), 'queue') + + def _is_index_updated(self): + return self._index_updated + + def _set_index_updated(self, index_updated): + if index_updated != self._index_updated: + self._index_updated = index_updated + + index_updated_path = os.path.join(self._root_path, 'index_updated') + if os.path.exists(index_updated_path): + os.remove(index_updated_path) + else: + open(index_updated_path, 'w').close() + + index_updated = property(_is_index_updated, _set_index_updated) + + def find_all(self): + uids = [] + for f in os.listdir(self._root_path): + if os.path.isdir(os.path.join(self._root_path, f)) and len(f) == 2: + for g in os.listdir(os.path.join(self._root_path, f)): + if len(g) == 36: + uids.append(g) + return uids + +_instance = None +def get_instance(): + global _instance + if _instance is None: + _instance = LayoutManager() + return _instance + diff --git a/src/carquinyol/metadatareader.c b/src/carquinyol/metadatareader.c new file mode 100644 index 0000000..08be17e --- /dev/null +++ b/src/carquinyol/metadatareader.c @@ -0,0 +1,241 @@ +#include "Python.h" + +#include <dirent.h> + +// TODO: put it in a place where python can use it when writing metadata +#define MAX_PROPERTY_LENGTH 500 * 1024 + +static PyObject *byte_array_type = NULL; + +int +add_property(char *metadata_path, char *property_name, PyObject *dict, + int must_exist) +{ + int file_path_size; + char *file_path = NULL; + FILE *file = NULL; + long file_size; + char *value_buf = NULL; + PyObject *value = NULL; + struct stat file_stat; + + // Build path of the property file + file_path_size = strlen(metadata_path) + 1 + strlen(property_name) + 1; + file_path = PyMem_Malloc(file_path_size); + if (file_path == NULL) { + PyErr_NoMemory(); + return 0; + } + snprintf (file_path, file_path_size, "%s/%s", metadata_path, property_name); + + if ((!must_exist) && (stat(file_path, &file_stat) != 0)) { + PyMem_Free(file_path); + return; + } + + file = fopen(file_path, "r"); + if (file == NULL) { + char buf[256]; + snprintf(buf, sizeof(buf), "Cannot open property file %s: %s", + file_path, strerror(errno)); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + // Get file size + fseek (file, 0, SEEK_END); + file_size = ftell (file); + rewind (file); + + if (file_size == 0) { + // Empty property + fclose(file); + file = NULL; + + value = PyString_FromString(""); + if (value == NULL) { + PyErr_SetString(PyExc_ValueError, + "Failed to convert value to python string"); + goto cleanup; + } + } else { + if (file_size > MAX_PROPERTY_LENGTH) { + PyErr_SetString(PyExc_ValueError, "Property file too big"); + goto cleanup; + } + + // Read the whole file + value_buf = PyMem_Malloc(file_size); + if (value_buf == NULL) { + PyErr_NoMemory(); + goto cleanup; + } + long read_size = fread(value_buf, 1, file_size, file); + if (read_size < file_size) { + char buf[256]; + snprintf(buf, sizeof(buf), + "Error while reading property file %s", file_path); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + fclose(file); + file = NULL; + + // Convert value to dbus.ByteArray + PyObject *args = Py_BuildValue("(s#)", value_buf, file_size); + + PyMem_Free(value_buf); + value_buf = NULL; + + value = PyObject_CallObject(byte_array_type, args); + if (value == NULL) { + PyErr_SetString(PyExc_ValueError, + "Failed to convert value to dbus.ByteArray"); + goto cleanup; + } + } + + // Add property to the metadata dict + if (PyDict_SetItemString(dict, property_name, value) == -1) { + PyErr_SetString(PyExc_ValueError, + "Failed to add property to dictionary"); + goto cleanup; + } + + Py_DECREF(value); + PyMem_Free(file_path); + + return 1; + +cleanup: + if (file_path) { + PyMem_Free(file_path); + } + if (value_buf) { + PyMem_Free(value_buf); + } + if (file) { + fclose(file); + } + if (value) { + Py_DECREF(value); + } + return 0; +} + +static PyObject * +read_from_properties_list (char *metadata_path, PyObject *properties) +{ + PyObject *dict = PyDict_New(); + + int i; + for (i = 0; i < PyList_Size(properties); i++) { + PyObject *property = PyList_GetItem(properties, i); + char *property_name = PyString_AsString (property); + + if (add_property(metadata_path, property_name, dict, 0) == 0) + goto cleanup; + } + + return dict; + +cleanup: + if (dict) { + Py_DECREF(dict); + } + return NULL; +} + +static PyObject * +read_all_properties (char *metadata_path) +{ + PyObject *dict = PyDict_New(); + DIR *dir_stream = NULL; + struct dirent *dir_entry = NULL; + + dir_stream = opendir (metadata_path); + if (dir_stream == NULL) { + char buf[256]; + snprintf(buf, sizeof(buf), "Couldn't open metadata directory %s", + metadata_path); + PyErr_SetString(PyExc_IOError, buf); + goto cleanup; + } + + dir_entry = readdir(dir_stream); + while (dir_entry != NULL) { + // Skip . and .. + if (dir_entry->d_name[0] == '.' && + (strlen(dir_entry->d_name) == 1 || + (dir_entry->d_name[1] == '.' && + strlen(dir_entry->d_name) == 2))) + goto next_property; + + if (add_property(metadata_path, dir_entry->d_name, dict, 1) == 0) + goto cleanup; + + next_property: + dir_entry = readdir(dir_stream); + } + + closedir(dir_stream); + + return dict; + +cleanup: + if (dict) { + Py_DECREF(dict); + } + if (dir_stream) { + closedir(dir_stream); + } + return NULL; +} + +static PyObject * +metadatareader_retrieve(PyObject *unused, PyObject *args) +{ + PyObject *dict = NULL; + PyObject *properties = NULL; + const char *dir_path = NULL; + char *metadata_path = NULL; + + if (!PyArg_ParseTuple(args, "sO:retrieve", &dir_path, &properties)) + return NULL; + + // Build path to the metadata directory + int metadata_path_size = strlen(dir_path) + 10; + metadata_path = PyMem_Malloc(metadata_path_size); + if (metadata_path == NULL) { + PyErr_NoMemory(); + return NULL; + } + snprintf (metadata_path, metadata_path_size, "%s/%s", dir_path, "metadata"); + + if ((properties != Py_None) && (PyList_Size(properties) > 0)) { + dict = read_from_properties_list(metadata_path, properties); + } else { + dict = read_all_properties(metadata_path); + } + + PyMem_Free(metadata_path); + + return dict; +} + +static PyMethodDef metadatareader_functions[] = { + {"retrieve", metadatareader_retrieve, METH_VARARGS, PyDoc_STR("Read a dictionary from a file")}, + {NULL, NULL, 0, NULL} +}; + +PyMODINIT_FUNC +initmetadatareader(void) +{ + PyObject* mod; + mod = Py_InitModule("metadatareader", metadatareader_functions); + + PyObject *dbus_module = PyImport_ImportModule("dbus"); + byte_array_type = PyObject_GetAttrString(dbus_module, "ByteArray"); +} + diff --git a/src/carquinyol/metadatastore.py b/src/carquinyol/metadatastore.py new file mode 100644 index 0000000..50981f3 --- /dev/null +++ b/src/carquinyol/metadatastore.py @@ -0,0 +1,64 @@ +import os + +from carquinyol import layoutmanager +from carquinyol import metadatareader + +MAX_SIZE = 256 + +class MetadataStore(object): + def store(self, uid, metadata): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + metadata_path = os.path.join(dir_path, 'metadata') + if not os.path.exists(metadata_path): + os.makedirs(metadata_path) + else: + for key in os.listdir(metadata_path): + os.remove(os.path.join(metadata_path, key)) + + metadata['uid'] = uid + for key, value in metadata.items(): + + # Hack to support activities that still pass properties named as for + # example title:text. + if ':' in key: + key = key.split(':', 1)[0] + + f = open(os.path.join(metadata_path, key), 'w') + try: + if isinstance(value, unicode): + value = value.encode('utf-8') + elif not isinstance(value, basestring): + value = str(value) + f.write(value) + finally: + f.close() + + def retrieve(self, uid, properties=None): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + return metadatareader.retrieve(dir_path, properties) + + def delete(self, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + for key in os.listdir(metadata_path): + os.remove(os.path.join(metadata_path, key)) + os.rmdir(metadata_path) + + def get_property(self, uid, key): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + if os.path.exists(property_path): + return open(property_path, 'r').read() + else: + return None + + def set_property(self, uid, key, value): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + property_path = os.path.join(metadata_path, key) + open(property_path, 'w').write(value) + diff --git a/src/carquinyol/migration.py b/src/carquinyol/migration.py new file mode 100644 index 0000000..228db2a --- /dev/null +++ b/src/carquinyol/migration.py @@ -0,0 +1,102 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +"""Transform one DataStore directory in a newer format. +""" + +import os +import logging +import shutil +import time +import traceback + +import cjson + +from carquinyol import layoutmanager + +DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' + +def migrate_from_0(): + logging.info('Migrating datastore from version 0 to version 1') + + root_path = layoutmanager.get_instance().get_root_path() + old_root_path = os.path.join(root_path, 'store') + if not os.path.exists(old_root_path): + return + + for f in os.listdir(old_root_path): + uid, ext = os.path.splitext(f) + if ext != '.metadata': + continue + + logging.debug('Migrating entry %r' % uid) + try: + _migrate_metadata(root_path, old_root_path, uid) + _migrate_file(root_path, old_root_path, uid) + _migrate_preview(root_path, old_root_path, uid) + except Exception: + logging.error('Error while migrating entry %r: %s\n' % \ + (uid, traceback.format_exc())) + + # Just be paranoid, it's cheap. + if old_root_path.endswith('datastore/store'): + shutil.rmtree(old_root_path) + + logging.info('Migration finished') + +def _migrate_metadata(root_path, old_root_path, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + os.makedirs(metadata_path) + + old_metadata_path = os.path.join(old_root_path, uid + '.metadata') + metadata = cjson.decode(open(old_metadata_path, 'r').read()) + + if 'uid' not in metadata: + metadata['uid'] = uid + + if 'timestamp' not in metadata and 'mtime' in metadata: + metadata['timestamp'] = \ + time.mktime(time.strptime(metadata['mtime'], DATE_FORMAT)) + + for key, value in metadata.items(): + try: + f = open(os.path.join(metadata_path, key), 'w') + try: + if isinstance(value, unicode): + value = value.encode('utf-8') + if not isinstance(value, basestring): + value = str(value) + f.write(value) + finally: + f.close() + except Exception: + logging.error( + 'Error while migrating property %s of entry %s: %s\n' % \ + (key, uid, traceback.format_exc())) + +def _migrate_file(root_path, old_root_path, uid): + if os.path.exists(os.path.join(old_root_path, uid)): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + os.rename(os.path.join(old_root_path, uid), + os.path.join(dir_path, 'data')) + +def _migrate_preview(root_path, old_root_path, uid): + dir_path = layoutmanager.get_instance().get_entry_path(uid) + metadata_path = os.path.join(dir_path, 'metadata') + os.rename(os.path.join(old_root_path, 'preview', uid), + os.path.join(metadata_path, 'preview')) + diff --git a/src/carquinyol/optimizer.py b/src/carquinyol/optimizer.py new file mode 100644 index 0000000..f8a2e3e --- /dev/null +++ b/src/carquinyol/optimizer.py @@ -0,0 +1,166 @@ +# Copyright (C) 2008, One Laptop Per Child +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import errno +import logging +import subprocess + +import gobject + +from carquinyol import layoutmanager + +class Optimizer(object): + """Optimizes disk space usage by detecting duplicates and sharing storage. + """ + def __init__(self, file_store, metadata_store): + self._file_store = file_store + self._metadata_store = metadata_store + self._enqueue_checksum_id = None + + def optimize(self, uid): + """Add an entry to a queue of entries to be checked for duplicates. + + """ + if not os.path.exists(self._file_store.get_file_path(uid)): + return + + queue_path = layoutmanager.get_instance().get_queue_path() + open(os.path.join(queue_path, uid), 'w').close() + logging.debug('optimize %r' % os.path.join(queue_path, uid)) + + if self._enqueue_checksum_id is None: + self._enqueue_checksum_id = \ + gobject.idle_add(self._process_entry_cb, + priority=gobject.PRIORITY_LOW) + + def remove(self, uid): + """Remove any structures left from space optimization + + """ + checksum = self._metadata_store.get_property(uid, 'checksum') + if checksum is None: + return + + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + checksum_entry_path = os.path.join(checksum_path, uid) + + if os.path.exists(checksum_entry_path): + logging.debug('remove %r' % checksum_entry_path) + os.remove(checksum_entry_path) + + if os.path.exists(checksum_path): + try: + os.rmdir(checksum_path) + logging.debug('removed %r' % checksum_path) + except OSError, e: + if e.errno != errno.ENOTEMPTY: + raise + + def _identical_file_already_exists(self, checksum): + """Check if we already have files with this checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(checksum_path) + + def _get_uid_from_checksum(self, checksum): + """Get an existing entry which file matches checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + first_uid = os.listdir(checksum_path)[0] + return first_uid + + def _create_checksum_dir(self, checksum): + """Create directory that tracks files with this same checksum. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + logging.debug('create dir %r' % checksum_path) + os.mkdir(checksum_path) + + def _add_checksum_entry(self, uid, checksum): + """Create a file in the checksum dir with the uid of the entry + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + + logging.debug('touch %r' % os.path.join(checksum_path, uid)) + open(os.path.join(checksum_path, uid), 'w').close() + + def _already_linked(self, uid, checksum): + """Check if this entry's file is already a hard link to the checksums + dir. + + """ + checksums_dir = layoutmanager.get_instance().get_checksums_dir() + checksum_path = os.path.join(checksums_dir, checksum) + return os.path.exists(os.path.join(checksum_path, uid)) + + def _process_entry_cb(self): + """Process one item in the checksums queue by calculating its checksum, + checking if there exist already an identical file, and in that case + substituting its file with a hard link to that pre-existing file. + + """ + queue_path = layoutmanager.get_instance().get_queue_path() + queue = os.listdir(queue_path) + if queue: + uid = queue[0] + logging.debug('_process_entry_cb processing %r' % uid) + + file_in_entry_path = self._file_store.get_file_path(uid) + if not os.path.exists(file_in_entry_path): + logging.info('non-existent entry in queue: %r' % uid) + else: + checksum = self._calculate_md5sum(file_in_entry_path) + self._metadata_store.set_property(uid, 'checksum', checksum) + + if self._identical_file_already_exists(checksum): + if not self._already_linked(uid, checksum): + existing_entry_uid = \ + self._get_uid_from_checksum(checksum) + + self._file_store.hard_link_entry(uid, + existing_entry_uid) + + self._add_checksum_entry(uid, checksum) + else: + self._create_checksum_dir(checksum) + self._add_checksum_entry(uid, checksum) + + os.remove(os.path.join(queue_path, uid)) + + if len(queue) <= 1: + self._enqueue_checksum_id = None + return False + else: + return True + + def _calculate_md5sum(self, path): + """Calculate the md5 checksum of a given file. + + """ + popen = subprocess.Popen(['md5sum', path], stdout=subprocess.PIPE) + stdout, stderr_ = popen.communicate() + return stdout.split(' ', 1)[0] + |