# # Author: Sascha Silbe (OpenPGP signed mails only) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # TODO: investigate python-fs # import collections import errno import functools import logging import os import stat import threading import time import dbus import xapian import xdg.BaseDirectory import xdg.Mime DS_DBUS_SERVICE = 'org.laptop.sugar.DataStore' DS_DBUS_INTERFACE1 = 'org.laptop.sugar.DataStore' DS_DBUS_PATH1 = '/org/laptop/sugar/DataStore' DS_DBUS_INTERFACE2 = 'org.laptop.sugar.DataStore2' DS_DBUS_PATH2 = '/org/laptop/sugar/DataStore2' # nearly infinite DBUS_TIMEOUT_MAX = 2 ** 31 / 1000 DBUS_PYTHON_VALUE_ERROR = 'org.freedesktop.DBus.Python.ValueError' _USEFUL_PROPS = ['mime_type', 'tags', 'timestamp', 'title'] """Metadata properties used for determining the file name of an entry""" def synchronised(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): with self._lock: return func(self, *args, **kwargs) return wrapper class _LRU(collections.MutableMapping): """Simple, but reasonably fast Least Recently Used (LRU) cache""" def __init__(self, capacity): self.capacity = capacity self._dict = {} self._q = collections.deque() self.__contains__ = self._dict.__contains__ def __delitem__(self, key): self._q.remove(key) del self._dict[key] def __iter__(self): return self._dict.__iter__() def __getitem__(self, key): value = self._dict[key] if self._q[-1] == key: return value self._q.remove(key) self._q.append(key) return value def __len__(self): return len(self._q) def __setitem__(self, key, value): if key in self._dict: self._q.remove(key) elif len(self._dict) == self.capacity: del self._dict[self._q.popleft()] self._q.append(key) self._dict[key] = value def clear(self): self._q.clear() self._dict.clear() class DataStore(object): def __init__(self, root_query): self._root_query = root_query self.supports_versions = False self._lock = threading.RLock() self._data_store_version = 0 bus = dbus.SessionBus() try: self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH2), DS_DBUS_INTERFACE2) self._data_store.find({'tree_id': 'invalid'}, {'metadata': ['tree_id']}) self.supports_versions = True logging.info('Data store with version support found') return except dbus.DBusException: logging.debug('No data store with version support found') self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, DS_DBUS_PATH1), DS_DBUS_INTERFACE1) self._data_store.find({'uid': 'invalid'}, ['uid']) logging.info('Data store without version support found') if 'uri' in self._data_store.mounts()[0]: self._data_store_version = 82 data_store_path = '/home/olpc/.sugar/default/datastore' self._data_store_mount_id = [mount['id'] for mount in self._data_store.mounts() if mount['uri'] == data_store_path][0] logging.info('0.82 data store found') else: logging.info('0.84+ data store without version support found') self._data_store_version = 84 @synchronised def list_object_ids(self, query=None): """Retrieve the object_ids of all (matching) data store entries Only return the latest version of each entry for data stores with version support. """ query = self._merge_root_query(query) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: options = {'metadata': ['tree_id', 'version_id'], 'order_by': ['-timestamp']} return [(unicode(entry['tree_id']), unicode(entry['version_id'])) for entry in self._data_store.find(query, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0]] elif self._data_store_version == 82: properties = ['uid', 'mountpoint'] return [unicode(entry['uid']) for entry in self._data_store.find(query, properties, byte_arrays=True, timeout=DBUS_TIMEOUT_MAX)[0] if entry['mountpoint'] == self._data_store_mount_id] else: return [unicode(entry['uid']) for entry in self._data_store.find(query, ['uid'], byte_arrays=True, timeout=DBUS_TIMEOUT_MAX)[0]] @synchronised def list_metadata(self, query=None): """Retrieve object_id and selected metadata of matching entries Only return the latest version of each entry for data stores with version support. Returns a list of tuples containing the object_id and metadata. """ query = self._merge_root_query(query) properties = list(_USEFUL_PROPS) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: properties += ['parent_id', 'tree_id', 'version_id'] options = {'metadata': properties, 'all_versions': True, 'order_by': ['-timestamp']} return [((unicode(entry['tree_id']), unicode(entry['version_id'])), self._convert_metadata(entry)) for entry in self._data_store.find(query, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0]] elif self._data_store_version == 82: properties += ['uid', 'mountpoint'] return [(unicode(entry['uid']), self._convert_metadata(entry)) for entry in self._data_store.find(query, properties, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] if entry['mountpoint'] == self._data_store_mount_id] else: properties += ['uid'] return [(unicode(entry['uid']), self._convert_metadata(entry)) for entry in self._data_store.find(query, properties, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0]] @synchronised def list_versions(self, tree_id): """Retrieve all version_ids of the given data store entry""" query = dict(self._root_query) query['tree_id'] = tree_id options = {'all_versions': True, 'order_by': ['-timestamp']} return [unicode(entry['version_id']) for entry in self._data_store.find(query, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0]] @synchronised def list_tree_ids(self, query=None): """Retrieve the tree_ids of all (matching) data store entries""" query = self._merge_root_query(query) return [unicode(entry[0]) for entry in self.list_object_ids(query)] @synchronised def list_property_values(self, name, query=None): """Return all unique values of the given property""" assert isinstance(name, unicode) query = self._merge_root_query(query) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: options = {'metadata': [name], 'all_versions': True} entries = self._data_store.find(query, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] else: # We can't use get_uniquevaluesfor() as sugar-datastore # only supports it for activity_id, which is not what # we need. entries = self._data_store.find(query, [name], timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] return dict.fromkeys([entry.get(name) for entry in entries]).keys() @synchronised def check_object_id(self, object_id): """Return True if the given object_id identifies a data store entry""" try: entry = self.get_properties(object_id, self._root_query.keys()) except dbus.DBusException, exception: if exception.get_dbus_name() == DBUS_PYTHON_VALUE_ERROR: return False raise return True @synchronised def check_tree_id(self, tree_id): """Return True if the given tree_id identifies a data store entry""" assert isinstance(tree_id, unicode) query = dict(self._root_query) query['tree_id'] = tree_id results = self._data_store.find(query, {}, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] return bool(results) @synchronised def check_property_contains(self, name, word): """Return True if there is at least one entry containing word in the given property """ assert isinstance(name, unicode) assert isinstance(word, unicode) query_string = u'%s:"%s"' % (name, word.replace(u'"', u'')) query = dict(self._root_query) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: options = {'limit': 1} results = self._data_store.text_search(query, query_string, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] else: query['query'] = query_string query['limit'] = 1 results = self._data_store.find(query, [name], timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] return bool(results) @synchronised def get_properties(self, object_id, names=None): """Read given properties for data store entry identified by object_id Returns a dictionary with unicode strings as keys and values. """ query = dict(self._root_query) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: tree_id, version_id = object_id assert isinstance(tree_id, unicode) assert isinstance(version_id, unicode) query['tree_id'] = tree_id query['version_id'] = version_id options = {} if names: options['metadata'] = names results = self._data_store.find(query, options, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] if not results: raise ValueError('Object %r does not exist' % (object_id, )) return self._convert_metadata(results[0]) else: assert isinstance(object_id, unicode) query['uid'] = object_id results = self._data_store.find(query, names or [], timeout=DBUS_TIMEOUT_MAX, byte_arrays=True)[0] metadata = self._data_store.get_properties(object_id, byte_arrays=True) if not results: raise ValueError('Object %r does not exist' % (object_id, )) metadata = results[0] metadata['uid'] = object_id if names: metadata = dict([(name, metadata[name]) for name in names if name in metadata]) return self._convert_metadata(metadata) @synchronised def list_properties(self, object_id): """List the names of all properties for this entry Returns a list of unicode strings. """ return self.get_properties(object_id).keys() @synchronised def create_property(self, object_id, name, value): """Set the given property, raising an error if it already exists""" assert isinstance(name, unicode) metadata = self.get_properties(object_id) if name in metadata: raise IOError(errno.EEXIST, os.strerror(errno.EEXIST)) metadata[name] = value self._change_metadata(object_id, metadata) @synchronised def replace_property(self, object_id, name, value): """Modify the given, already existing property""" assert isinstance(name, unicode) assert isinstance(value, unicode) metadata = self.get_properties(object_id) if name not in metadata: # on Linux ENOATTR=ENODATA (Python errno doesn't contain ENOATTR) raise IOError(errno.ENODATA, os.strerror(errno.ENODATA)) metadata[name] = value self._change_metadata(object_id, metadata) @synchronised def set_properties(self, object_id, properties): """Write the given (sub)set of properties properties -- metadata as dictionary with unicode strings as keys and values """ assert not [True for key, value in properties.items() if (not isinstance(key, unicode)) or (not isinstance(value, unicode))] metadata = self.get_properties(object_id) metadata.update(properties) self._change_metadata(object_id, metadata) @synchronised def remove_properties(self, object_id, names): """Remove the given (sub)set of properties names -- list of property names (unicode strings) """ metadata = self.get_properties(object_id) for name in names: assert isinstance(name, unicode) if name not in metadata: # on Linux ENOATTR=ENODATA (and no ENOATTR in errno module) raise IOError(errno.ENODATA, os.strerror(errno.ENODATA)) del metadata[name] self._change_metadata(object_id, metadata) @synchronised def remove_entry(self, object_id): """Remove a single (version of a) data store entry""" # Make sure we don't allow deleting entries that don't match # the root query. if not self.check_object_id(object_id): raise ValueError('Object %r does not exist' % (object_id, )) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: tree_id, version_id = object_id self._data_store.delete(tree_id, version_id, timeout=DBUS_TIMEOUT_MAX) else: self._data_store.delete(object_id, timeout=DBUS_TIMEOUT_MAX) @synchronised def create_new(self, properties): """Create a new data store entry properties -- metadata as dictionary with unicode strings as keys and values """ assert not [True for key, value in properties.items() if (not isinstance(key, unicode)) or (not isinstance(value, unicode))] if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: return self._data_store.save('', '', properties, '', False, timeout=DBUS_TIMEOUT_MAX) else: return self._data_store.create(properties, '', False) @synchronised def get_data(self, object_id): """Return path to data for data store entry identified by object_id.""" # Make sure we don't allow deleting entries that don't match # the root query. if not self.check_object_id(object_id): raise ValueError('Object %r does not exist' % (object_id, )) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: tree_id, version_id = object_id return self._data_store.get_data(tree_id, version_id, byte_arrays=True) else: return self._data_store.get_filename(object_id, byte_arrays=True) @synchronised def get_size(self, object_id): # FIXME: make use of filesize property if available path = self.get_data(object_id) if not path: return 0 size = os.stat(path).st_size os.remove(path) return size @synchronised def write_data(self, object_id, path): """Update data for data store entry identified by object_id. Return object_id of the updated entry. If the data store does not support versions, this will be the same as the one given as parameter. path -- Path of data file in real file system (string) """ assert isinstance(path, str) properties = self.get_properties(object_id) if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: tree_id, parent_id = object_id res = self._data_store.save(tree_id, parent_id, properties, path, False, timeout=DBUS_TIMEOUT_MAX, byte_arrays=True) tree_id_, child_id = res assert tree_id == tree_id_ return unicode(tree_id), unicode(child_id) else: self._data_store.update(object_id, properties, path, False, timeout=DBUS_TIMEOUT_MAX) return unicode(object_id) def _merge_root_query(self, query): query = dict(query or {}) xapian_query = query.get('query', '') query.update(self._root_query) if ('query' in self._root_query) and xapian_query: if not check_xapian_query(xapian_query): raise ValueError('Invalid Xapian query: %r' % xapian_query) query['query'] = '(%s) AND (%s)' % (self._root_query['query'], xapian_query) return query def _convert_metadata(self, metadata): """Convert metadata (as returned by the data store) to a unicode dict The data store may keep the data type it got as input or convert it to a string, at it's own discretion. To keep our processing sane and independent of the data store implementation, we pass unicode strings as input and convert output to unicode strings. As an exception, we keep values that cannot be converted from UTF-8 (e.g. previews in PNG format) as (binary) strings. """ metadata_unicode = dict() for key, value in metadata.items(): if isinstance(key, str): key_unicode = unicode(key, 'utf-8') else: key_unicode = unicode(key) if isinstance(value, str): try: value_unicode = unicode(value, 'utf-8') except UnicodeDecodeError: # Keep binary strings as-is value_unicode = value else: value_unicode = unicode(value) metadata_unicode[key_unicode] = value_unicode return metadata_unicode def _change_metadata(self, object_id, metadata): if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: tree_id, version_id = object_id self._data_store.change_metadata(tree_id, version_id, metadata) else: self._data_store.update(object_id, metadata, '', False) class FSEntry(object): def __init__(self, file_system, mode): self._fs = file_system self._ds = file_system.data_store self.mode = mode def get_properties(self, names=None, use_cache=False): """Read the given properties (default: all) Returns a dictionary with unicode strings as keys and values. As an exception, values that cannot be converted from UTF-8 (e.g. previews in PNG format) are represented by (binary) strings. """ return [] def list_properties(self): """List the names of all properties for this entry Returns a list of unicode strings. """ return [] def lookup(self, name_): raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) def mkdir(self, name_): raise IOError(errno.EACCES, os.strerror(errno.EACCES)) def readlink(self): raise IOError(errno.EINVAL, os.strerror(errno.EINVAL)) def remove(self): """Remove this entry""" raise IOError(errno.EACCES, os.strerror(errno.EACCES)) def create_property(self, name, value): """Set the given property, raising an error if it already exists""" raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP)) def replace_property(self, name, value): """Modify the given, already existing property""" raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP)) def set_properties(self, properties): """Write the given (sub)set of properties properties -- dictionary with unicode strings as keys and values """ raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP)) def remove_properties(self, names): """Remove the given (sub)set of properties names -- list of property names (unicode strings) """ raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP)) def get_ctime(self): """Return the time the object was originally created Return POSIX timestamp as float.""" return 0. def get_mtime(self): """Return the time the object was last modified Return POSIX timestamp as float.""" return time.time() def get_data(self): """Return the entire content of this entry""" # FIXME: inefficient or even impractical for large files raise IOError(errno.EISDIR, os.strerror(errno.EISDIR)) def get_size(self): """Return the size of the content in bytes""" return 0 class Symlink(FSEntry): def __init__(self, file_system, target): assert isinstance(target, unicode) FSEntry.__init__(self, file_system, stat.S_IFLNK | 0777) self.target = target def readlink(self): return self.target def __repr__(self): return 'Symlink(%r, %r)' % (self._fs, self.target) class DSObjectBase(FSEntry): def __init__(self, file_system, mode, object_id, metadata=None): FSEntry.__init__(self, file_system, mode) self.object_id = object_id self._metadata = metadata self._have_nonstandard = False def get_properties(self, names=None, use_cache=False): names = names or [] nonstandard_names = bool([True for name in names if name not in _USEFUL_PROPS]) if names and not nonstandard_names: fetch_names = names else: fetch_names = None if ((not use_cache) or (self._metadata is None) or (nonstandard_names and not self._have_nonstandard)): self._metadata = self._ds.get_properties(self.object_id, fetch_names) self._have_nonstandard = nonstandard_names if not names: return self._metadata return dict([(name, self._metadata[name]) for name in names if name in self._metadata]) def get_ctime(self): props = self.get_properties([u'creation_time', u'timestamp']) try: return float(props[u'creation_time']) except (KeyError, ValueError, TypeError): pass try: return float(props[u'timestamp']) except (KeyError, ValueError, TypeError): return time.time() def get_mtime(self): props = self.get_properties([u'creation_time', u'timestamp']) try: return float(props[u'timestamp']) except (KeyError, ValueError, TypeError): return time.time() class ObjectSymlink(Symlink, DSObjectBase): def __init__(self, file_system, target, object_id, metadata=None): assert isinstance(target, unicode) DSObjectBase.__init__(self, file_system, stat.S_IFLNK | 0777, object_id, metadata) self.target = target class DSObject(DSObjectBase): def __init__(self, file_system, object_id, metadata=None): DSObjectBase.__init__(self, file_system, stat.S_IFREG | 0750, object_id, metadata) def list_properties(self): return self._ds.list_properties(self.object_id) def create_property(self, name, value): return self._ds.create_property(self.object_id, name, value) def replace_property(self, name, value): return self._ds.replace_property(self.object_id, name, value) def set_properties(self, properties): return self._ds.set_properties(self.object_id, properties) def remove(self): self._ds.remove_entry(self.object_id) def remove_properties(self, names): return self._ds.remove_properties(self.object_id, names) def get_data(self): return self._ds.get_data(self.object_id) def write_data(self, file_name): return self._ds.write_data(self.object_id, file_name) def get_size(self): return self._ds.get_size(self.object_id) class Directory(FSEntry): def __init__(self, file_system, level, mode, parent=None): self.parent = parent self.level = level FSEntry.__init__(self, file_system, stat.S_IFDIR | mode) def listdir(self): yield u'.' yield u'..' def readdir(self): yield (u'.', self) if self.parent is not None: yield (u'..', self.parent) def _get_symlink(self, object_id, metadata=None): directory_path = u'../' * self.level + u'by-id/' if isinstance(object_id, tuple): assert (isinstance(object_id[0], unicode) and isinstance(object_id[1], unicode)) return ObjectSymlink(self._fs, directory_path + u'/'.join(object_id), object_id, metadata) else: assert isinstance(object_id, unicode) return ObjectSymlink(self._fs, directory_path + object_id, object_id, metadata) class ByTitleDirectory(Directory): def __init__(self, file_system, level, parent): Directory.__init__(self, file_system, level, 0550, parent) self._query = None def listdir(self): for name in Directory.listdir(self): yield name for object_id in self._ds.list_object_ids(self._query): name = self._fs.lookup_title_name(object_id) yield name def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for object_id, metadata in self._ds.list_metadata(self._query): name = self._fs.lookup_title_name(object_id, metadata) yield (name, self._get_symlink(object_id, metadata)) def lookup(self, name): object_id = self._fs.resolve_title_name(name) return self._get_symlink(object_id) def mknod(self, name): if self._fs.try_resolve_title_name(name): raise IOError(errno.EEXIST, os.strerror(errno.EEXIST)) object_id_ = self._ds.create_new({'title': name}) class ByUidDirectory(Directory): def __init__(self, file_system, level, parent): Directory.__init__(self, file_system, level, 0550, parent) def lookup(self, object_id): if not self._ds.check_object_id(object_id): raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) return DSObject(self._fs, object_id) def listdir(self): for name in Directory.listdir(self): yield name for object_id in self._ds.list_object_ids(): yield object_id def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for object_id in self._ds.list_object_ids(): yield (object_id, DSObject(self._fs, object_id)) class ByVersionIdDirectory(Directory): def __init__(self, file_system, level, parent, tree_id): self._tree_id = tree_id Directory.__init__(self, file_system, level, 0550, parent) def lookup(self, version_id): object_id = (self._tree_id, version_id) if not self._ds.check_object_id(object_id): raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) return DSObject(self._fs, object_id) def listdir(self): for name in Directory.listdir(self): yield name for version_id in self._ds.list_versions(self._tree_id): yield (self._tree_id, version_id) def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for version_id in self._ds.list_versions(self._tree_id): object_id = (self._tree_id, version_id) yield (object_id, DSObject(self._fs, object_id)) class ByTreeIdDirectory(Directory): def __init__(self, file_system, level, parent): Directory.__init__(self, file_system, level, 0550, parent) def lookup(self, tree_id): if not self._ds.check_tree_id(tree_id): raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) return ByVersionIdDirectory(self._fs, self.level + 1, self, tree_id) def listdir(self): for name in Directory.listdir(self): yield name for tree_id in self._ds.list_tree_ids(): yield tree_id def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for tree_id in self._ds.list_tree_ids(): yield (tree_id, ByVersionIdDirectory(self._fs, self.level + 1, self, tree_id)) class ByTagsSubDirectory(ByTitleDirectory): def __init__(self, file_system, level, parent, tags): self._tags = frozenset(tags) ByTitleDirectory.__init__(self, file_system, level, parent) def mknod(self, name): if self._fs.try_resolve_title_name(name): raise IOError(errno.EEXIST, os.strerror(errno.EEXIST)) props = {'title': name, 'tags': ' '.join(self._tags)} object_id_ = self._ds.create_new(props) def listdir(self): for name in Directory.listdir(self): yield name for object_id, metadata in self._find_entries(): name = self._fs.lookup_title_name(object_id, metadata) yield name def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for object_id, metadata in self._find_entries(): name = self._fs.lookup_title_name(object_id, metadata) yield (name, self._get_symlink(object_id, metadata)) def _find_entries(self): query = {'query': ' '.join(self._tags)} for object_id, props in self._ds.list_metadata(query): entry_tags = frozenset(props.get('tags', '').split()) if self._tags - entry_tags: continue yield object_id, props class ByTagsDirectory(Directory): def __init__(self, file_system, level, parent): Directory.__init__(self, file_system, level, 0550, parent) self._tag_dirs = {} def listdir(self): for name in Directory.listdir(self): yield name for tag in self._list_tags(): if u'/' in tag or tag.startswith(u'.'): continue yield tag def readdir(self): for name, entry in Directory.readdir(self): yield name, entry for tag in self._list_tags(): if u'/' in tag or tag.startswith(u'.'): continue if tag not in self._tag_dirs: self._tag_dirs[tag] = ByTagsSubDirectory(self._fs, self.level + 1, self, [tag]) yield (tag, self._tag_dirs[tag]) def lookup(self, name): if name not in self._tag_dirs: if not self._check_tag(name): raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) self._tag_dirs[name] = ByTagsSubDirectory(self._fs, self.level + 1, self, [name]) return self._tag_dirs[name] def _check_tag(self, name): return self._ds.check_property_contains(u'tags', name) def _list_tags(self): tags = set() for value in self._ds.list_property_values(u'tags'): tags.update((value or u'').split()) tags.discard(u'') return sorted(tags) class SearchResultDirectory(ByTitleDirectory): def __init__(self, file_system, level, parent, query): ByTitleDirectory.__init__(self, file_system, level, parent) self._query = query class RootDirectory(Directory): def __init__(self, file_system, mode): Directory.__init__(self, file_system, 0, mode, None) self._by_tags_directory = ByTagsDirectory(file_system, 1, self) self._by_title_directory = ByTitleDirectory(file_system, 1, self) if self._ds.supports_versions: self._by_id_directory = ByTreeIdDirectory(file_system, 1, self) else: self._by_id_directory = ByUidDirectory(file_system, 1, self) def listdir(self): for name in Directory.listdir(self): yield name yield u'by-id' yield u'by-tags' yield u'by-title' def readdir(self): for name, entry in Directory.readdir(self): yield name, entry yield (u'by-id', self._by_id_directory) yield (u'by-tags', self._by_tags_directory) yield (u'by-title', self._by_title_directory) def lookup(self, name): if name == u'by-id': return self._by_id_directory elif name == u'by-tags': return self._by_tags_directory elif name == u'by-title': return self._by_title_directory raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) class FSEmulation(object): # public API def __init__(self, root_query=None): self.data_store = DataStore(root_query or {}) # FIXME: determine good LRU size self._cache = _LRU(500) self._root_dir = RootDirectory(self, 0550) self._object_id_to_title_name = {} self._title_name_to_object_id = {} self._mime_type_exts = self._load_mime_type_exts() def resolve(self, path, follow_links=False): assert isinstance(path, unicode) stripped_path = path.strip(u'/') if not stripped_path: return self._root_dir partial_path = u'' entry = self._root_dir for component in stripped_path.split(u'/'): partial_path += u'/' + component # FIXME: add cache (in)validation if partial_path not in self._cache: self._cache[partial_path] = entry.lookup(component) entry = self._cache[partial_path] if path.endswith(u'/') and not isinstance(entry, Directory): raise IOError(errno.ENOTDIR, os.strerror(errno.ENOTDIR)) if isinstance(entry, Symlink) and follow_links: target = u'/%s/../%s' % (stripped_path, entry.readlink()) target_abs = os.path.abspath(target) return self.resolve(target_abs, follow_links=True) return entry def search(self, query): return SearchResultDirectory(self, 1, self._root_dir, query) # internal API def resolve_title_name(self, name): if name not in self._title_name_to_object_id: # FIXME: Hack to fill self._title_name_to_object_id. To be # replaced by parsing the name and doing a specific search. list(self.resolve(u'/by-title').readdir()) try: return self._title_name_to_object_id[name] except KeyError: raise IOError(errno.ENOENT, os.strerror(errno.ENOENT)) def try_resolve_title_name(self, name): return self._title_name_to_object_id.get(name) def lookup_title_name(self, object_id, metadata=None): name = self._object_id_to_title_name.get(object_id) if name: return name if metadata is None: metadata = self.data_store.get_properties(object_id, _USEFUL_PROPS) name = self._generate_title_name(metadata, object_id) self._add_title_name(name, object_id) return name # private methods def _add_title_name(self, name, object_id): self._object_id_to_title_name[object_id] = name self._title_name_to_object_id[name] = object_id return name def _generate_title_name(self, metadata, object_id): title = metadata.get(u'title') try: mtime = float(metadata[u'timestamp']) except (KeyError, ValueError): mtime = time.time() time_human = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(mtime)) name = u'%s - %s' % (title, time_human) name = safe_name(name) extension = self._guess_extension(metadata.get('mime_type'), object_id) if extension: current_name = u'%s.%s' % (name, extension) else: current_name = name counter = 1 while current_name in self._title_name_to_object_id: counter += 1 if extension: current_name = u'%s %d.%s' % (name, counter, extension) else: current_name = u'%s %d' % (name, counter) return current_name def _remove_title_name_by_object_id(self, object_id): name = self._object_id_to_title_name.pop(object_id, None) if name: del self._title_name_to_object_id[name] def _remove_title_name_by_name(self, name): object_id = self._title_name_to_object_id.pop(name, None) if object_id: del self._object_id_to_title_name[object_id] def _guess_extension(self, mime_type, object_id): if not mime_type: file_name = self.data_store.get_data(object_id) if file_name: try: mime_type = xdg.Mime.get_type(file_name) finally: os.remove(file_name) return self._mime_type_exts.get(mime_type) def _load_mime_type_exts(self): """Return a heuristic mapping from MIME type to file name extension Return a map from MIME type to the best guess for its primary (preferred) file name extension. As most MIME type databases are not designed for this task, it's just a crude heuristic that will be off even for common MIME types. """ globs2_paths = list(xdg.BaseDirectory.load_data_paths( os.path.join('mime', 'globs2'))) rev_exts = {} # System locations usually give a better estimate of the # primary extension for a MIME type, so check them first. for path in reversed(globs2_paths): for line in open(path): line = line.strip() if line.startswith('#') or not line: continue weight_, type_name, glob_pattern = line.split(':', 2) if type_name in rev_exts: # There's already a better match (globs2 is sorted # by weight). continue if not glob_pattern.startswith('*.'): continue ext = glob_pattern[2:] if '*' in ext or '[' in ext: continue rev_exts[type_name] = ext return rev_exts def safe_name(name): return name.replace(u'/', u'_') def check_xapian_query(query): """Return False if Xapian query is invalid Return False if the given Xapian query contains incorrectly balanced parentheses. Return True otherwise. """ num_parens = 0 inside_quote = False for c in query: if c == '"': inside_quote = not inside_quote elif c == '(' and not inside_quote: num_parens +=1 elif c == ')' and not inside_quote: if num_parens < 1: return False num_parens -= 1 return num_parens == 0