# # Author: Sascha Silbe # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Gdatastore D-Bus service API """ import ast import hashlib import logging import os import pprint import re import shutil from subprocess import Popen, PIPE import tempfile import time import uuid import dbus import dbus.service import gconf try: from sugar import mime as sugar_mime except ImportError: # Only used for helping legacy applications that use the file # extension rather than the MIME type sugar_mime = None from gdatastore.index import Index DBUS_SERVICE_NATIVE_V1 = 'org.silbe.GDataStore' DBUS_INTERFACE_NATIVE_V1 = 'org.silbe.GDataStore1' DBUS_PATH_NATIVE_V1 = '/org/silbe/GDataStore1' DBUS_SERVICE_SUGAR_V2 = 'org.laptop.sugar.DataStore' DBUS_INTERFACE_SUGAR_V2 = 'org.laptop.sugar.DataStore' DBUS_PATH_SUGAR_V2 = '/org/laptop/sugar/DataStore' DBUS_SERVICE_SUGAR_V3 = 'org.laptop.sugar.DataStore' DBUS_INTERFACE_SUGAR_V3 = 'org.laptop.sugar.DataStore2' DBUS_PATH_SUGAR_V3 = '/org/laptop/sugar/DataStore2' _DBUS_METADATA_BASIC_RE = re.compile( r"""dbus.(U?Int(16|32|64)|Double|String|ByteArray)\((?P(-?[0-9]+(\.[0-9]*)?)|(u?('([^'\\]|\\.)*'|"([^"\\]|\\.)*")))(, variant_level=[0-9]+)?\)""") _DBUS_METADATA_DICTIONARY_RE = re.compile( r"""dbus.Dictionary\((?P\{.*\}), signature=dbus.Signature\('s[sv]'\)\)""") class DataStoreError(Exception): pass class GitError(DataStoreError): def __init__(self, returncode, stderr): self.returncode = returncode self.stderr = unicode(stderr) Exception.__init__(self) def __unicode__(self): return u'Git returned with exit code #%d: %s' % (self.returncode, self.stderr) def __str__(self): return self.__unicode__() class DBusApiNativeV1(dbus.service.Object): """Native gdatastore D-Bus API """ def __init__(self, internal_api): self._internal_api = internal_api bus_name = dbus.service.BusName(DBUS_SERVICE_NATIVE_V1, bus=dbus.SessionBus(), replace_existing=False, allow_replacement=False, do_not_queue=True) dbus.service.Object.__init__(self, bus_name, DBUS_PATH_NATIVE_V1) self._internal_api.add_callback('change_metadata', self.__change_metadata_cb) self._internal_api.add_callback('delete', self.__delete_cb) self._internal_api.add_callback('save', self.__save_cb) @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='sssa{sv}') def AddedNewVersion(self, tree_id, child_id, parent_id, metadata): # pylint: disable-msg=C0103 pass @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ssa{sv}') def Created(self, tree_id, child_id, metadata): # pylint: disable-msg=C0103 pass @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ssa{sv}') def ChangedMetadata(self, tree_id, version_id, metadata): # pylint: disable-msg=C0103 pass @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ss') def Deleted(self, tree_id, version_id): # pylint: disable-msg=C0103 pass @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='a{sv}ay', out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def create(self, metadata, data_path, async_cb, async_err_cb): """ - add new entry, assign ids - data='' indicates no data to store - bad design? (data OOB) """ # TODO: what about transfer_ownership/delete_after? self._internal_api.save(tree_id='', parent_id='', metadata=metadata, path=data_path, delete_after=True, async_cb=async_cb, async_err_cb=async_err_cb) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='a{sv}h', out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def create_fd(self, metadata, data_fd, async_cb, async_err_cb): """ - add new entry, assign ids """ # FIXME: avoid unnecessary copy, instead change # InternalAPI.save() to take fd and adapt existing callers. tmp_file = tempfile.NamedTemporaryFile(delete=False) with os.fdopen(data_fd.take(), 'rb') as data_file: shutil.copyfileobj(data_file, tmp_file) tmp_file.flush() self._internal_api.save(tree_id='', parent_id='', metadata=metadata, path=tmp_file.name, delete_after=True, async_cb=async_cb, async_err_cb=async_err_cb) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ssa{sv}ay', out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def add_version(self, tree_id, parent_id, metadata, data_path, async_cb, async_err_cb): """ - add new version to existing object """ def success_cb(tree_id, child_id): async_cb(child_id) if not tree_id: raise ValueError('No tree_id given') if not parent_id: raise ValueError('No parent_id given') self._internal_api.save(tree_id, parent_id, metadata, data_path, delete_after=True, async_cb=success_cb, async_err_cb=async_err_cb) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ssa{sv}h', out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def add_version_fd(self, tree_id, parent_id, metadata, data_fd, async_cb, async_err_cb): """ - add new version to existing object """ def success_cb(tree_id, child_id): async_cb(child_id) if not tree_id: raise ValueError('No tree_id given') if not parent_id: raise ValueError('No parent_id given') # FIXME: avoid unnecessary copy, instead change # InternalAPI.save() to take fd and adapt existing callers. tmp_file = tempfile.NamedTemporaryFile(delete=False) with os.fdopen(data_fd.take(), 'rb') as data_file: shutil.copyfileobj(data_file, tmp_file) tmp_file.flush() self._internal_api.save(tree_id, parent_id, metadata, path=tmp_file.name, delete_after=True, async_cb=success_cb, async_err_cb=async_err_cb) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ssa{sv}', out_signature='', byte_arrays=True) def change_metadata(self, tree_id, version_id, metadata): """ - change the metadata of an existing version """ object_id = (tree_id, version_id) self._internal_api.change_metadata(object_id, metadata) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ss', out_signature='') def delete(self, tree_id, version_id): object_id = (tree_id, version_id) self._internal_api.delete(object_id) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='a{sv}a{sv}', out_signature='aa{sv}u', byte_arrays=True) def find(self, query_dict, options): return self._internal_api.find(query_dict, options) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ss', out_signature='ay', sender_keyword='sender') def get_data_path(self, tree_id, version_id, sender=None): object_id = (tree_id, version_id) return self._internal_api.get_data_path(object_id, sender=sender) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ss', out_signature='h', sender_keyword='sender') def get_data_fd(self, tree_id, version_id, sender=None): object_id = (tree_id, version_id) path = self._internal_api.get_data_path(object_id, sender=sender) return os.open(path, os.O_RDONLY) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='ss', out_signature='a{sv}') def get_metadata(self, tree_id, version_id): object_id = (tree_id, version_id) return self._internal_api.get_properties(object_id) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='a{sv}sa{sv}', out_signature='aa{sv}u', byte_arrays=True) def text_search(self, query_dict, query_string, options): return self._internal_api.find(query_dict, options, query_string) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='sssa{sv}ay', out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def restore(self, tree_id, parent_id, version_id, metadata, data_path, async_cb, async_err_cb): """ - add a new version with the given ids - there must be no existing entry with the same (tree_id, version_id) """ if not tree_id: raise ValueError('No tree_id given') metadata['version_id'] = version_id self._internal_api.save(tree_id, parent_id, metadata, data_path, delete_after=True, allow_new_parent=True, async_cb=async_cb, async_err_cb=async_err_cb) @dbus.service.method(DBUS_INTERFACE_NATIVE_V1, in_signature='sssa{sv}s', out_signature='ss', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def restore_fd(self, tree_id, parent_id, version_id, metadata, data_fd, async_cb, async_err_cb): """ - add a new version with the given ids - there must be no existing entry with the same (tree_id, version_id) """ if not tree_id: raise ValueError('No tree_id given') metadata['version_id'] = version_id # FIXME: avoid unnecessary copy, instead change # InternalAPI.save() to take fd and adapt existing callers. tmp_file = tempfile.NamedTemporaryFile(delete=False) with os.fdopen(data_fd.take(), 'rb') as data_file: shutil.copyfileobj(data_file, tmp_file) tmp_file.flush() self._internal_api.save(tree_id, parent_id, metadata, tmp_file.name, delete_after=True, allow_new_parent=True, async_cb=async_cb, async_err_cb=async_err_cb) def __change_metadata_cb(self, (tree_id, version_id), metadata): self.ChangedMetadata(tree_id, version_id, metadata) def __delete_cb(self, (tree_id, version_id)): self.Deleted(tree_id, version_id) def __save_cb(self, tree_id, child_id, parent_id, metadata): if parent_id: self.AddedNewVersion(tree_id, child_id, parent_id, metadata) else: self.Created(tree_id, child_id, metadata) class DBusApiSugarV2(dbus.service.Object): """Compatibility layer for the Sugar 0.84+ data store D-Bus API """ def __init__(self, internal_api): self._internal_api = internal_api bus_name = dbus.service.BusName(DBUS_SERVICE_SUGAR_V2, bus=dbus.SessionBus(), replace_existing=False, allow_replacement=False, do_not_queue=True) dbus.service.Object.__init__(self, bus_name, DBUS_PATH_SUGAR_V2) self._internal_api.add_callback('change_metadata', self.__change_metadata_cb) self._internal_api.add_callback('delete', self.__delete_cb) self._internal_api.add_callback('save', self.__save_cb) @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='a{sv}sb', out_signature='s', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def create(self, props, file_path, transfer_ownership, async_cb, async_err_cb): def success_cb(tree_id, child_id): async_cb(tree_id) self._internal_api.save(tree_id='', parent_id='', metadata=props, path=file_path.encode('utf-8'), delete_after=transfer_ownership, async_cb=success_cb, async_err_cb=async_err_cb) @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s') def Created(self, uid): # pylint: disable-msg=C0103 pass @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='sa{sv}sb', out_signature='', async_callbacks=('async_cb', 'async_err_cb'), byte_arrays=True) def update(self, uid, props, file_path, transfer_ownership, async_cb, async_err_cb): def success_cb(tree_id, child_id): async_cb() latest_versions = self._get_latest(uid) if not latest_versions: raise ValueError('Trying to update non-existant entry %s - wanted' ' to use create()?' % (uid, )) parent = latest_versions[0] object_id = parent['tree_id'], parent['version_id'] if self._check_identical(parent, file_path): self._internal_api.change_metadata(object_id, props) return success_cb(uid, None) self._internal_api.save(tree_id=uid, parent_id=parent['version_id'], metadata=props, path=file_path.encode('utf-8'), delete_after=transfer_ownership, async_cb=success_cb, async_err_cb=async_err_cb) @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s') def Updated(self, uid): # pylint: disable-msg=C0103 pass @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='a{sv}as', out_signature='aa{sv}u') def find(self, query, properties): if 'uid' in properties: properties.append('tree_id') properties.remove('uid') options = {'metadata': properties} for name in ['offset', 'limit', 'order_by']: if name in query: options[name] = query.pop(name) if 'uid' in query: query['tree_id'] = query.pop('uid') results, count = self._internal_api.find(query, options, query.pop('query', None)) if not properties or 'tree_id' in properties: for entry in results: entry['uid'] = entry.pop('tree_id') return results, count @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='s', out_signature='s', sender_keyword='sender') def get_filename(self, uid, sender=None): latest_versions = self._get_latest(uid) if not latest_versions: raise ValueError('Entry %s does not exist' % (uid, )) object_id = (uid, latest_versions[0]['version_id']) return self._internal_api.get_data_path(object_id, sender=sender) @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='s', out_signature='a{sv}') def get_properties(self, uid): latest_versions = self._get_latest(uid) if not latest_versions: raise ValueError('Entry %s does not exist' % (uid, )) latest_versions[0]['uid'] = latest_versions[0].pop('tree_id') del latest_versions[0]['version_id'] return latest_versions[0] @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='sa{sv}', out_signature='as') def get_uniquevaluesfor(self, propertyname, query=None): return self._internal_api.find_unique_values(query, propertyname) @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='s', out_signature='') def delete(self, uid): latest_versions = self._get_latest(uid) if not latest_versions: raise ValueError('Entry %s does not exist' % (uid, )) self._internal_api.delete((uid, latest_versions[0]['version_id'])) @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s') def Deleted(self, uid): # pylint: disable-msg=C0103 pass @dbus.service.method(DBUS_INTERFACE_SUGAR_V2, in_signature='', out_signature='aa{sv}') def mounts(self): return [{'id': 1}] @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='a{sv}') def Mounted(self, descriptior): # pylint: disable-msg=C0103 pass @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='a{sv}') def Unmounted(self, descriptor): # pylint: disable-msg=C0103 pass def _get_latest(self, uid): return self._internal_api.find({'tree_id': uid}, {'limit': 1, 'order_by': ['+timestamp']})[0] def _check_identical(self, parent, child_data_path): """Check whether the new version contains the same data as the parent If child_data_path is empty, but the parent contains data, that's interpreted as wanting to do a metadata-only update (emulating sugar-datastore behaviour). """ parent_object_id = (parent['tree_id'], parent['version_id']) parent_data_path = self._internal_api.get_data_path(parent_object_id) if not child_data_path: return True elif child_data_path and not parent_data_path: return False # TODO: compare checksums? return False def __change_metadata_cb(self, (tree_id, version_id), metadata): self.Updated(tree_id) def __delete_cb(self, (tree_id, version_id)): if self._get_latest(tree_id): self.Updated(tree_id) else: self.Deleted(tree_id) def __save_cb(self, tree_id, child_id, parent_id, metadata): if parent_id: self.Updated(tree_id) else: self.Created(tree_id) class InternalApi(object): SIGNALS = ['change_metadata', 'delete', 'save'] def __init__(self, base_dir): self._base_dir = base_dir self._callbacks = {} self._checkouts_dir = os.path.join(base_dir, 'checkouts') if not os.path.exists(self._checkouts_dir): os.makedirs(self._checkouts_dir) self._git_dir = os.path.join(base_dir, 'git') self._git_env = {} gconf_client = gconf.client_get_default() self._max_versions = gconf_client.get_int( '/desktop/sugar/datastore/max_versions') logging.debug('max_versions=%r', self._max_versions) data_stores = [{'name': '', 'index_dir': os.path.join(self._base_dir, 'index'), 'git_dir': os.path.join(self._base_dir, 'git')}] extras_dir = os.path.join(self._base_dir, 'extra-stores') if not os.path.isdir(extras_dir): os.makedirs(extras_dir) for extra_name in os.listdir(extras_dir): extra_dir = os.path.realpath(os.path.join(extras_dir, extra_name)) data_stores.append({'name': extra_name, 'index_dir': os.path.join(extra_dir, 'index'), 'git_dir': os.path.join(extra_dir, 'git')}) self._index = Index(data_stores) self._migrate() self._check_reindex() logging.info('ready') def add_callback(self, signal, callback): if signal not in InternalApi.SIGNALS: raise ValueError('Invalid signal %r' % (signal, )) self._callbacks.setdefault(signal, []).append(callback) def change_metadata(self, object_id, metadata): logging.debug('change_metadata(%r, %r)', object_id, metadata) metadata['tree_id'], metadata['version_id'] = object_id if 'creation_time' not in metadata: old_metadata = self._index.retrieve(object_id)['metadata'] metadata['creation_time'] = old_metadata['creation_time'] self._index.store(object_id, metadata) self._invoke_callbacks('change_metadata', object_id, metadata) def delete(self, object_id): logging.debug('delete(%r)', object_id) self._index.delete(object_id) self._git_call('update-ref', ['-d', _format_ref(*object_id)]) self._invoke_callbacks('delete', object_id) def get_data_path(self, (tree_id, version_id), sender=None): logging.debug('get_data_path((%r, %r), %r)', tree_id, version_id, sender) entry = self._index.retrieve((tree_id, version_id)) metadata = entry['metadata'] git_dir = entry['data_store']['git_dir'] ref_name = _format_ref(tree_id, version_id) top_level_entries = self._git_call('ls-tree', [ref_name], git_dir=git_dir).splitlines() if len(top_level_entries) == 1 and \ top_level_entries[0].endswith('\tdata'): blob_hash = top_level_entries[0].split('\t')[0].split(' ')[2] mime_type = metadata.get('mime_type', '') return self._checkout_file(blob_hash, git_dir, suffix=_guess_extension(mime_type)) return self._checkout_dir(ref_name, git_dir) def find(self, query_dict, options, query_string=None): logging.debug('find(%r, %r, %r)', query_dict, options, query_string) entries, total_count = self._index.find(query_dict, query_string, options) #logging.debug('object_ids=%r', object_ids) property_names = options.pop('metadata', None) for entry in entries: for name in entry.keys(): if property_names and name not in property_names: del entry[name] elif isinstance(entry[name], str): entry[name] = dbus.ByteArray(entry[name]) return entries, total_count def find_unique_values(self, query, name): logging.debug('find_unique_values(%r, %r)', query, name) if query: raise NotImplementedError('non-empty query not supported yet') return self._index.find_unique_values(name) def get_properties(self, object_id): return self._index.retrieve(object_id)['metadata'] def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb, async_err_cb, allow_new_parent=False): logging.debug('save(%r, %r, %r, %r, %r)', tree_id, parent_id, metadata, path, delete_after) if path: path = os.path.realpath(path) if not os.access(path, os.R_OK): raise ValueError('Invalid path given.') # FIXME: requesting deletion of the _symlink_ is OK, we # only need to take care to delete the symlink rather than # the symlink target if delete_after and not os.access(os.path.dirname(path), os.W_OK): raise ValueError('Deletion requested for read-only directory') if (not tree_id) and parent_id: raise ValueError('tree_id is empty but parent_id is not') if tree_id and not parent_id and not allow_new_parent: if self.find({'tree_id': tree_id}, {'limit': 1})[1]: raise ValueError('No parent_id given but tree_id already ' 'exists') if not tree_id: tree_id = self._gen_uuid() child_id = metadata.get('version_id') if not child_id: child_id = self._gen_uuid() elif not tree_id: raise ValueError('No tree_id given but metadata contains' ' version_id') elif self._index.contains((tree_id, child_id)): raise ValueError('There is an existing entry with the same tree_id' ' and version_id') if 'timestamp' not in metadata: metadata['timestamp'] = time.time() if 'creation_time' not in metadata: metadata['creation_time'] = metadata['timestamp'] if os.path.isfile(path): metadata['filesize'] = str(os.stat(path).st_size) elif not path: metadata['filesize'] = '0' tree_id = str(tree_id) parent_id = str(parent_id) child_id = str(child_id) metadata['tree_id'] = tree_id metadata['version_id'] = child_id # TODO: check metadata for validity first (index?) self._log_store((tree_id, child_id)) self._store_entry_in_git(tree_id, child_id, parent_id, path, metadata) self._index.store((tree_id, child_id), metadata) self._invoke_callbacks('save', tree_id, child_id, parent_id, metadata) if delete_after and path: os.remove(path) async_cb(tree_id, child_id) def stop(self): logging.debug('stop()') self._index.close() def _add_to_git_index(self, index_path, path): if os.path.isdir(path): self._git_call('add', ['-A'], work_dir=path, index_path=index_path) elif os.path.isfile(path): object_hash = self._git_call('hash-object', ['-w', path]).strip() mode = os.stat(path).st_mode self._git_call('update-index', ['--add', '--cacheinfo', oct(mode), object_hash, 'data'], index_path=index_path) else: raise DataStoreError('Refusing to store special object %r' % (path, )) def _check_max_versions(self, tree_id): if not self._max_versions: return options = {'all_versions': True, 'offset': self._max_versions, 'metadata': ['tree_id', 'version_id', 'timestamp'], 'order_by': ['+timestamp']} old_versions = self.find({'tree_id': tree_id}, options)[0] logging.info('Deleting old versions: %r', old_versions) for entry in old_versions: self.delete((entry['tree_id'], entry['version_id'])) def _checkout_file(self, blob_hash, git_dir, suffix=''): fd, file_name = tempfile.mkstemp(dir=self._checkouts_dir, suffix=suffix) try: self._git_call('cat-file', ['blob', blob_hash], git_dir=git_dir, stdout_fd=fd) finally: os.close(fd) return file_name def _checkout_dir(self, ref_name, git_dir): # FIXME return '' def _create_repo(self): os.makedirs(self._git_dir) self._git_call('init', ['-q', '--bare']) def _migrate(self): if not os.path.exists(self._git_dir): return self._create_repo() def _check_reindex(self): """Recreate or update index if necessary """ last_object_id = self._get_last_object_id_from_log() # Non-existence of the log (i.e. last_object_id=None) does not # necessarily mean an empty data store: We could be upgrading # from a previous version that didn't write the file, a file # system corruption may have occured or the user may have # deleted the log to force reindexing. This operation is cheap # enough on empty data stores that we don't care about the # performance impact on valid, empty data stores. if not last_object_id or not self._index.contains(last_object_id): logging.info('Rebuilding index') self._reindex() def _reindex(self): """Recreate or update index from git repository Log the last object after finishing the rebuild. """ last_object_id = None for object_id in self._get_object_ids_from_git(): last_object_id = object_id logging.debug('reindex(): checking entry %r', object_id) if self._index.contains(object_id): continue logging.debug('reindex(): adding entry %r from git', object_id) metadata = self._get_metadata_from_git(object_id) self._index.store(object_id, metadata) if last_object_id: self._log_store(last_object_id) def _format_commit_message(self, metadata): return pprint.pformat(to_native(metadata)) def _parse_commit_message(self, commit_message): try: return ast.literal_eval(commit_message) except ValueError: return self._parse_commit_message_dbus(commit_message) def _parse_commit_message_dbus(self, commit_message): # Compatibility work-around to parse commit messages # written by previous versions and containing dbus.Int() # instead of plain integer literals. num_subs = 1 while num_subs: commit_message, num_subs = re.subn(_DBUS_METADATA_DICTIONARY_RE, '\g', commit_message) num_subs = 1 while num_subs: commit_message, num_subs = re.subn(_DBUS_METADATA_BASIC_RE, '\g', commit_message) return ast.literal_eval(commit_message) def _gen_uuid(self): return str(uuid.uuid4()) def _git_call(self, command, args=None, input=None, input_fd=None, stdout_fd=None, work_dir=None, index_path=None, git_dir=None): env = dict(self._git_env) if work_dir: env['GIT_WORK_TREE'] = work_dir if index_path: env['GIT_INDEX_FILE'] = index_path logging.debug('calling git %s, env=%r', ['git', command] + (args or []), env) pipe = Popen(['git', command] + (args or []), stdin=input_fd or PIPE, stdout=stdout_fd or PIPE, stderr=PIPE, close_fds=True, cwd=git_dir or self._git_dir, env=env) stdout, stderr = pipe.communicate(input) if pipe.returncode: raise GitError(pipe.returncode, stderr) return stdout def _invoke_callbacks(self, signal, *args): for callback in self._callbacks.get(signal, []): callback(*args) def _store_entry_in_git(self, tree_id, version_id, parent_id, path, metadata): commit_message = self._format_commit_message(metadata) tree_hash = self._write_tree(path) commit_hash = self._git_call('commit-tree', [tree_hash], input=commit_message).strip() self._git_call('update-ref', [_format_ref(tree_id, version_id), commit_hash]) def _write_tree(self, path): if not path: return self._git_call('hash-object', ['-w', '-t', 'tree', '--stdin'], input='').strip() index_dir = tempfile.mkdtemp(prefix='gdatastore-') index_path = os.path.join(index_dir, 'index') try: self._add_to_git_index(index_path, path) return self._git_call('write-tree', index_path=index_path).strip() finally: shutil.rmtree(index_dir) def _get_object_ids_from_git(self): args = ['--sort=committerdate', '--format=%(refname)', 'refs/gdatastore/*/*'] return [tuple(line.rsplit('/', 2)[1:]) for line in self._git_call('for-each-ref', args).split()] def _get_metadata_from_git(self, object_id): args = ['commit', _format_ref(*object_id)] commit_message = self._git_call('cat-file', args).split('\n\n', 1)[1] return self._parse_commit_message(commit_message) def _log_store(self, object_id): """Record the fact that we tried to store the given object Make sure we know on next start-up that the object with the given object_id was the last one to be processed. Used for checking the index on start-up and triggering a rebuild if necessary. """ log_name = os.path.join(self._base_dir, 'last_object_id') tmp_name = log_name + '.tmp' with open(tmp_name, 'w') as f: f.write(repr(tuple(object_id))) f.flush() os.fsync(f.fileno()) os.rename(tmp_name, log_name) def _get_last_object_id_from_log(self): """Return the object_id saved by _log_store() Return the object_id of the last object to be processed, as written by _log_store(). If no such log exists, return None. """ log_name = os.path.join(self._base_dir, 'last_object_id') if not os.path.exists(log_name): return None return ast.literal_eval(open(log_name).read()) def calculate_checksum(path): checksum = hashlib.sha1() f = file(path) while True: chunk = f.read(65536) if not chunk: return checksum.hexdigest() checksum.update(chunk) def to_native(value): if isinstance(value, list): return [to_native(e) for e in value] elif isinstance(value, dict): return dict([(to_native(k), to_native(v)) for k, v in value.items()]) elif isinstance(value, unicode): return unicode(value) elif isinstance(value, str): return str(value) elif isinstance(value, int): return int(value) elif isinstance(value, float): return float(value) else: raise TypeError('Unknown type: %s' % (type(value), )) def _format_ref(tree_id, version_id): return 'refs/gdatastore/%s/%s' % (tree_id, version_id) def _guess_extension(mime_type): if sugar_mime is None: return '' extension = sugar_mime.get_primary_extension(mime_type) if not extension: return '' return '.' + extension