diff options
author | Sascha Silbe <sascha-pgp@silbe.org> | 2010-08-28 21:11:29 (GMT) |
---|---|---|
committer | Sascha Silbe <sascha-pgp@silbe.org> | 2010-08-28 21:11:29 (GMT) |
commit | 779341f684e0996bbf1bde1e4b2df868545bcf80 (patch) | |
tree | 9c6fbb197474a0e89d23632f17bcecf395aa594b | |
parent | e77aecfefc2d5b45963bb6230cc85b91f9b809f8 (diff) |
merge code from in-Journal implementation, use worker process
-rw-r--r-- | backup.py | 489 |
1 files changed, 398 insertions, 91 deletions
@@ -2,7 +2,7 @@ # # Author: Sascha Silbe <sascha-pgp@silbe.org> # -# This program is free software; you can redistribute it and/or modify +# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # @@ -12,31 +12,32 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -"""Janitor. Activity to perform housekeeping tasks on the Sugar Journal. +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Backup. Activity to back up the Sugar Journal to external media. """ from gettext import gettext as _ import logging import os +import select +import sys import tempfile import time +import traceback import zipfile -#import gobject +import dbus +import gobject import gtk -#import pango -from sugar.activity.widgets import ActivityToolbarButton +#from sugar.activity.widgets import ActivityToolbarButton from sugar.activity.widgets import StopButton from sugar.activity import activity -from sugar.datastore import datastore import sugar.env -#from sugar.graphics import style from sugar.graphics.toolbutton import ToolButton from sugar.graphics.toolbarbox import ToolbarBox -#from sugar.graphics.toolbarbox import ToolbarButton import sugar.logger +from sugar import profile try: import json @@ -44,17 +45,24 @@ except ImportError: import simplejson as json +DS_DBUS_SERVICE = "org.laptop.sugar.DataStore" +DS_DBUS_INTERFACE1 = "org.laptop.sugar.DataStore" +DS_DBUS_PATH1 = "/org/laptop/sugar/DataStore" +DS_DBUS_INTERFACE2 = "org.laptop.sugar.DataStore2" +DS_DBUS_PATH2 = "/org/laptop/sugar/DataStore2" + + def format_size(size): if not size: return _('Empty') - elif size < 1024: - return _('%4d B') % size - elif size < 1024**2: - return _('%4d KB') % (size / 1024) - elif size < 1024**3: - return _('%4d MB') % (size / 1024**2) + elif size < 10*1024: + return _('%4d B') % size + elif size < 10*1024**2: + return _('%4d KiB') % (size // 1024) + elif size < 10*1024**3: + return _('%4d MiB') % (size // 1024**2) else: - return _('%4d GB') % (size / 1024**3) + return _('%4d GiB') % (size // 1024**3) class BackupButton(ToolButton): @@ -65,40 +73,333 @@ class BackupButton(ToolButton): self.props.accelerator = '<Alt>b' +class AsyncBackup(gobject.GObject): + """ + Run a data store backup asynchronously. + """ + + _METADATA_JSON_NAME = '_metadata.json' + + __gsignals__ = { + 'progress': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, + ([int, int])), + 'done': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([])), + 'error': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([str])), + } + + def __init__(self, mount_point): + gobject.GObject.__init__(self) + self._mount_point = mount_point + self._path = None + self._bundle = None + self._child_pid = None + self._pipe_from_child = None + self._pipe_to_child = None + self._pipe_from_child_watch_id = None + self._num_entries = None + self._entries = None + self._data_store = None + self._user_name = profile.get_nick_name().replace('/', ' ') + self._key_hash = profile.get_profile().privkey_hash + + if '\0' in self._user_name: + raise ValueError('Invalid user name') + + + def start(self): + """Start the backup process.""" + to_child_read_fd, to_child_write_fd = os.pipe() + from_child_read_fd, from_child_write_fd = os.pipe() + + self._child_pid = os.fork() + if not self._child_pid: + os.close(from_child_read_fd) + os.close(to_child_write_fd) + self._pipe_from_child = os.fdopen(from_child_write_fd, 'w') + self._pipe_to_child = os.fdopen(to_child_read_fd, 'r') + self._child_run() + sys.exit(0) + else: + os.close(from_child_write_fd) + os.close(to_child_read_fd) + self._pipe_from_child = os.fdopen(from_child_read_fd, 'r') + self._pipe_to_child = os.fdopen(to_child_write_fd, 'w') + self._pipe_from_child_watch_id = gobject.io_add_watch( + self._pipe_from_child, + gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, + self._child_io_cb) + + def abort(self): + """Abort the backup and clean up.""" + self._pipe_to_child.write('abort\n') + self._parent_close() + + def _child_io_cb(self, source_, condition): + """Receive and handle message from child.""" + if condition in [gobject.IO_ERR, gobject.IO_HUP]: + logging.debug('error condition: %r', condition) + self.emit('error', _('Lost connection to child process')) + self._parent_close() + return False + + status = self._read_line_from_child() + if status == 'progress': + position = int(self._read_line_from_child()) + num_entries = int(self._read_line_from_child()) + self.emit('progress', position, num_entries) + return True + + elif status == 'done': + self.emit('done') + self._parent_close() + return False + + elif status == 'error': + message = unicode(self._read_line_from_child(), 'utf-8') + trace = unicode(self._pipe_from_child.read().strip(), 'utf-8') + logging.error('Child reported error: %s\n%s', message, trace) + self.emit('error', message) + self._parent_close() + return False + + else: + logging.error('Unknown status %r from child process', status) + self.emit('error', 'Unknown status %r from child process', status) + self.abort() + return False + + def _read_line_from_child(self): + """Read a line from the child process using low-level IO. + + This is a hack to work around the fact that file.readline() buffers + data without us knowing about it. If we call readline() a second + time when no data is buffered, it may block (=> the UI would hang). + If OTOH there is another line already in the buffer, we won't get + notified about it by select() as it already is in userspace. + There are cleaner ways to handle this (e.g. using the asyncore module), + but they are much more complex. + """ + line = [] + while True: + c = os.read(self._pipe_from_child.fileno(), 1) + if c == '\n': + return ''.join(line) + + line.append(c) + + def _parent_close(self): + """Close connections to child and wait for it.""" + gobject.source_remove(self._pipe_from_child_watch_id) + self._pipe_from_child.close() + self._pipe_to_child.close() + pid_, status = os.waitpid(self._child_pid, 0) + if os.WIFEXITED(status): + logging.debug('Child exited with rc=%d', os.WEXITSTATUS(status)) + elif os.WIFSIGNALED(status): + logging.debug('Child killed by signal %d', os.WTERMSIG(status)) + else: + logging.error('Sudden infant death syndrome') + + def _child_run(self): + """Main program of child.""" + try: + self._connect_to_data_store() + self._entries, self._num_entries = self._find_entries() + assert self._num_entries == len(self._entries) + self._path, self._bundle = self._create_bundle() + + for position, entry in enumerate(self._entries): + self._client_check_command() + + self._send_to_parent('progress\n%d\n%d\n' % (position, + self._num_entries)) + logging.debug('processing entry %r', entry) + self._add_entry(self._bundle, entry) + + self._send_to_parent('progress\n%d\n%d\n' % ( + self._num_entries, self._num_entries)) + self._bundle.fp.flush() + self._bundle.close() + self._send_to_parent('done\n') + + # pylint: disable=W0703 + except Exception, exception: + self._pipe_from_child.write('error\n') + message = unicode(exception).encode('utf-8') + self._pipe_from_child.write(message+'\n') + trace = unicode(traceback.format_exc()).encode('utf-8') + self._pipe_from_child.write(trace) + self._remove_bundle() + sys.exit(2) + + def _send_to_parent(self, message): + self._pipe_from_child.write(message) + self._pipe_from_child.flush() + + def _client_check_command(self): + """Check for and execute command from the parent.""" + in_ready, out_ready_, errors_on_ = select.select([self._pipe_to_child], + [], [], 0) + if not in_ready: + return + + command = self._pipe_to_child.readline().strip() + logging.debug('command %r received', command) + if command == 'abort': + self._remove_bundle() + sys.exit(1) + else: + raise ValueError('Unknown command %r' % (command, )) + + def _create_bundle(self): + """Create / open bundle (zip file) with unique file name.""" + date = time.strftime('%x') + prefix = _('Journal backup of %s (%s) on %s') % (self._user_name, + self._key_hash, date) + bundle_fd, path = self._create_file(self._mount_point, prefix, '.xoj') + try: + return path, zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) + finally: + os.close(bundle_fd) + + def _remove_bundle(self): + """Close bundle and remove it from permanent storage.""" + if self._path: + os.remove(self._path) + + if self._bundle and self._bundle.fp and not self._bundle.fp.closed: + self._bundle.close() + + def _create_file(self, directory, prefix, suffix): + """Create a unique file with given prefix and suffix in directory. + + Append random ASCII characters only if necessary. + """ + path = '%s/%s%s' % (directory, prefix, suffix) + flags = os.O_CREAT | os.O_EXCL + mode = 0600 + try: + return os.open(path, flags, mode), path + + except OSError: + return tempfile.mkstemp(dir=directory, prefix=prefix + ' ', + suffix=suffix) + + def _add_entry(self, bundle, entry): + """Add data store entry identified by entry to bundle.""" + if 'version_id' in entry: + object_id = (entry['tree_id'], entry['version_id']) + object_id_s = '%s,%s' % object_id + else: + object_id = entry['uid'] + object_id_s = object_id + + metadata = self._get_metadata(object_id) + data_path = self._get_data(object_id) + if data_path: + bundle.write(data_path, os.path.join(object_id_s, object_id_s)) + + for name, value in metadata.items(): + is_binary = False + try: + value.encode('utf-8') + except UnicodeDecodeError: + is_binary = True + + if is_binary or len(value) > 8192: + logging.debug('adding binary/large property %r', name) + bundle.writestr(os.path.join(object_id_s, str(name), + object_id_s), value) + del metadata[name] + + bundle.writestr(os.path.join(object_id_s, self._METADATA_JSON_NAME), + json.dumps(metadata)) + + def _connect_to_data_store(self): + """Open a connection to a Sugar data store.""" + # We forked => need to use a private connection and make sure we + # never allow the main loop to run + # http://lists.freedesktop.org/archives/dbus/2007-April/007498.html + # http://lists.freedesktop.org/archives/dbus/2007-August/008359.html + bus = dbus.SessionBus(private=True) + try: + self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, + DS_DBUS_PATH2), DS_DBUS_INTERFACE2) + logging.info('Data store with version support found') + return + + except dbus.DBusException: + logging.debug('No data store with version support found') + + self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE, + DS_DBUS_PATH1), DS_DBUS_INTERFACE1) + logging.info('Data store without version support found') + + def _find_entries(self): + """Retrieve a list of all entries from the data store.""" + if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: + return self._data_store.find({}, {'metadata': + ['tree_id', 'version_id'], 'all_versions': True}, + timeout=5*60, byte_arrays=True) + else: + return self._data_store.find({}, ['uid'], byte_arrays=True) + + def _get_metadata(self, object_id): + """Return metadata for data store entry identified by object_id.""" + if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: + tree_id, version_id = object_id + return self._data_store.find( + {'tree_id': tree_id, 'version_id': version_id}, {}, + byte_arrays=True)[0][0] + else: + return self._data_store.get_properties(object_id, byte_arrays=True) + + def _get_data(self, object_id): + """Return path to data for data store entry identified by object_id.""" + if self._data_store.dbus_interface == DS_DBUS_INTERFACE2: + tree_id, version_id = object_id + return self._data_store.get_data(tree_id, version_id, + byte_arrays=True) + else: + return self._data_store.get_filename(object_id, byte_arrays=True) + + class BackupActivity(activity.Activity): _METADATA_JSON_NAME = '_metadata.json' def __init__(self, handle): - activity.Activity.__init__(self, handle) + activity.Activity.__init__(self, handle, create_jobject=False) self.max_participants = 1 - self._msg_box_buf = None + self._progress_bar = None + self._message_box = None + self._backup = None self._setup_widgets() + def read_file(self, file_path): + """We don't have any state to save in the Journal.""" + return + + def write_file(self, file_path): + """We don't have any state to save in the Journal.""" + return + + def close(self, skip_save=False): + """We don't have any state to save in the Journal.""" + activity.Activity.close(self, skip_save=True) + def _setup_widgets(self): self._setup_toolbar() - self._setup_canvas() + self._setup_main_view() - def _setup_canvas(self): + def _setup_main_view(self): vbox = gtk.VBox() - - msg_box = gtk.TextView() - msg_box.set_editable(False) - msg_box.set_wrap_mode(gtk.WRAP_WORD) - self._msg_box_buf = msg_box.get_buffer() - vbox.pack_start(msg_box, expand=True, fill=True) - msg_box.show() - self.set_canvas(vbox) vbox.show() def _setup_toolbar(self): toolbar_box = ToolbarBox() - activity_button = ActivityToolbarButton(self) - toolbar_box.toolbar.insert(activity_button, -1) - activity_button.show() - backup_button = BackupButton() backup_button.connect('clicked', self.__backup_cb) toolbar_box.toolbar.insert(backup_button, -1) @@ -117,71 +418,77 @@ class BackupActivity(activity.Activity): self.set_toolbar_box(toolbar_box) toolbar_box.show() - def _add_msg(self, msg): - self._msg_box_buf.insert(self._msg_box_buf.get_end_iter(), - '%.1f %s\n' % (time.time(), msg)) - def __backup_cb(self, button): - try: - self._backup() - except Exception, exc: - logging.exception('Exception occured in _backup:') - self._add_msg('Error: %r' % (exc,)) - - def _backup(self): - # TODO: operate in chunks to conserve memory - # TODO: async operation - # TODO: better error handling - self._add_msg(_('Starting backup')) - path, bundle = self._create_bundle() - self._add_msg(_('Filename: %s') % (path, )) - for entry in datastore.find({})[0]: - self._add_msg(_('Processing entry %s (%s)') % (entry.object_id, - entry.metadata.get('title', ''))) - self._add_entry(bundle, entry) - - bundle.close() - self._add_msg(_('Backup finished')) + """Callback for Backup button.""" + # FIXME + mount_point = '/media/tmp' + self._setup_backup_view(mount_point) + self._start_backup(mount_point) + + def _start_backup(self, mount_point): + """Set up and start background worker process.""" + self._backup = AsyncBackup(mount_point) + self._backup.connect('progress', self._progress_cb) + self._backup.connect('error', self._error_cb) + self._backup.connect('done', self._done_cb) + self._backup.start() + + def _setup_backup_view(self, mount_point): + """Set up UI for showing feedback from worker process.""" + vbox = gtk.VBox(False) + + label = gtk.Label(_('Backing up Journal to %s') % (mount_point, )) + label.show() + vbox.pack_start(label) + + alignment = gtk.Alignment(xalign=0.5, yalign=0.5, xscale=0.5) + alignment.show() + + self._progress_bar = gtk.ProgressBar() + self._progress_bar.show() + alignment.add(self._progress_bar) + vbox.add(alignment) + + self._message_box = gtk.Label() + vbox.pack_start(self._message_box) + + # FIXME +# self._close_button = gtk.Button(_('Abort')) +# self._close_button.connect('clicked', self._close_cb) +# self._close_button.show() +# button_box = gtk.HButtonBox() +# button_box.show() +# button_box.add(self._close_button) +# vbox.pack_start(button_box, False) - def _create_bundle(self): - # TODO: create directly on target device - fd, path = tempfile.mkstemp() - try: - return path, zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) - finally: - os.close(fd) + vbox.show() + self.set_canvas(vbox) + self.show() - def _add_entry(self, bundle, entry): - metadata = entry.metadata.get_dictionary().copy() - logging.debug('metadata = %r', metadata) - object_id = entry.object_id - if isinstance(object_id, tuple): - metadata['tree_id'], metadata['version_id'] = object_id - object_id = '%s,%s' % object_id - else: - metadata['uid'] = object_id - object_id = str(object_id) + def _progress_cb(self, backup, position, num_entries): + """Update progress bar with information from child process.""" + self._progress_bar.props.text = '%d / %d' % (position, num_entries) + self._progress_bar.props.fraction = float(position) / num_entries - logging.debug('object_id = %r', object_id) - data_path = entry.file_path - if data_path: - bundle.write(data_path, os.path.join(object_id, object_id)) + def _done_cb(self, backup): + """Backup finished.""" + logging.debug('_done_cb') +# self._close_button.set_label(_('Finish')) - for name, value in metadata.items(): - is_binary = False - try: - value.encode('utf-8') - except UnicodeDecodeError: - is_binary = True + def _error_cb(self, backup, message): + """Receive error message from child process.""" + self._show_error(message) - if is_binary or len(value) > 8192: - logging.debug('adding binary/large property %r', name) - bundle.writestr(os.path.join(object_id, str(name), object_id), - value) - del metadata[name] + def _show_error(self, message): + """Present error message to user.""" + self._message_box.props.label = message + self._message_box.show() - bundle.writestr(os.path.join(object_id, self._METADATA_JSON_NAME), - json.dumps(metadata)) +# def _close_cb(self, button): +# if not self._done: +# self._backup.abort() + +# self.emit('close') sugar.logger.start() |