Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSascha Silbe <sascha-pgp@silbe.org>2010-08-28 21:11:29 (GMT)
committer Sascha Silbe <sascha-pgp@silbe.org>2010-08-28 21:11:29 (GMT)
commit779341f684e0996bbf1bde1e4b2df868545bcf80 (patch)
tree9c6fbb197474a0e89d23632f17bcecf395aa594b
parente77aecfefc2d5b45963bb6230cc85b91f9b809f8 (diff)
merge code from in-Journal implementation, use worker process
-rw-r--r--backup.py489
1 files changed, 398 insertions, 91 deletions
diff --git a/backup.py b/backup.py
index 44978b7..736ba25 100644
--- a/backup.py
+++ b/backup.py
@@ -2,7 +2,7 @@
#
# Author: Sascha Silbe <sascha-pgp@silbe.org>
#
-# This program is free software; you can redistribute it and/or modify
+# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3
# as published by the Free Software Foundation.
#
@@ -12,31 +12,32 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-"""Janitor. Activity to perform housekeeping tasks on the Sugar Journal.
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Backup. Activity to back up the Sugar Journal to external media.
"""
from gettext import gettext as _
import logging
import os
+import select
+import sys
import tempfile
import time
+import traceback
import zipfile
-#import gobject
+import dbus
+import gobject
import gtk
-#import pango
-from sugar.activity.widgets import ActivityToolbarButton
+#from sugar.activity.widgets import ActivityToolbarButton
from sugar.activity.widgets import StopButton
from sugar.activity import activity
-from sugar.datastore import datastore
import sugar.env
-#from sugar.graphics import style
from sugar.graphics.toolbutton import ToolButton
from sugar.graphics.toolbarbox import ToolbarBox
-#from sugar.graphics.toolbarbox import ToolbarButton
import sugar.logger
+from sugar import profile
try:
import json
@@ -44,17 +45,24 @@ except ImportError:
import simplejson as json
+DS_DBUS_SERVICE = "org.laptop.sugar.DataStore"
+DS_DBUS_INTERFACE1 = "org.laptop.sugar.DataStore"
+DS_DBUS_PATH1 = "/org/laptop/sugar/DataStore"
+DS_DBUS_INTERFACE2 = "org.laptop.sugar.DataStore2"
+DS_DBUS_PATH2 = "/org/laptop/sugar/DataStore2"
+
+
def format_size(size):
if not size:
return _('Empty')
- elif size < 1024:
- return _('%4d B') % size
- elif size < 1024**2:
- return _('%4d KB') % (size / 1024)
- elif size < 1024**3:
- return _('%4d MB') % (size / 1024**2)
+ elif size < 10*1024:
+ return _('%4d B') % size
+ elif size < 10*1024**2:
+ return _('%4d KiB') % (size // 1024)
+ elif size < 10*1024**3:
+ return _('%4d MiB') % (size // 1024**2)
else:
- return _('%4d GB') % (size / 1024**3)
+ return _('%4d GiB') % (size // 1024**3)
class BackupButton(ToolButton):
@@ -65,40 +73,333 @@ class BackupButton(ToolButton):
self.props.accelerator = '<Alt>b'
+class AsyncBackup(gobject.GObject):
+ """
+ Run a data store backup asynchronously.
+ """
+
+ _METADATA_JSON_NAME = '_metadata.json'
+
+ __gsignals__ = {
+ 'progress': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
+ ([int, int])),
+ 'done': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([])),
+ 'error': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE, ([str])),
+ }
+
+ def __init__(self, mount_point):
+ gobject.GObject.__init__(self)
+ self._mount_point = mount_point
+ self._path = None
+ self._bundle = None
+ self._child_pid = None
+ self._pipe_from_child = None
+ self._pipe_to_child = None
+ self._pipe_from_child_watch_id = None
+ self._num_entries = None
+ self._entries = None
+ self._data_store = None
+ self._user_name = profile.get_nick_name().replace('/', ' ')
+ self._key_hash = profile.get_profile().privkey_hash
+
+ if '\0' in self._user_name:
+ raise ValueError('Invalid user name')
+
+
+ def start(self):
+ """Start the backup process."""
+ to_child_read_fd, to_child_write_fd = os.pipe()
+ from_child_read_fd, from_child_write_fd = os.pipe()
+
+ self._child_pid = os.fork()
+ if not self._child_pid:
+ os.close(from_child_read_fd)
+ os.close(to_child_write_fd)
+ self._pipe_from_child = os.fdopen(from_child_write_fd, 'w')
+ self._pipe_to_child = os.fdopen(to_child_read_fd, 'r')
+ self._child_run()
+ sys.exit(0)
+ else:
+ os.close(from_child_write_fd)
+ os.close(to_child_read_fd)
+ self._pipe_from_child = os.fdopen(from_child_read_fd, 'r')
+ self._pipe_to_child = os.fdopen(to_child_write_fd, 'w')
+ self._pipe_from_child_watch_id = gobject.io_add_watch(
+ self._pipe_from_child,
+ gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP,
+ self._child_io_cb)
+
+ def abort(self):
+ """Abort the backup and clean up."""
+ self._pipe_to_child.write('abort\n')
+ self._parent_close()
+
+ def _child_io_cb(self, source_, condition):
+ """Receive and handle message from child."""
+ if condition in [gobject.IO_ERR, gobject.IO_HUP]:
+ logging.debug('error condition: %r', condition)
+ self.emit('error', _('Lost connection to child process'))
+ self._parent_close()
+ return False
+
+ status = self._read_line_from_child()
+ if status == 'progress':
+ position = int(self._read_line_from_child())
+ num_entries = int(self._read_line_from_child())
+ self.emit('progress', position, num_entries)
+ return True
+
+ elif status == 'done':
+ self.emit('done')
+ self._parent_close()
+ return False
+
+ elif status == 'error':
+ message = unicode(self._read_line_from_child(), 'utf-8')
+ trace = unicode(self._pipe_from_child.read().strip(), 'utf-8')
+ logging.error('Child reported error: %s\n%s', message, trace)
+ self.emit('error', message)
+ self._parent_close()
+ return False
+
+ else:
+ logging.error('Unknown status %r from child process', status)
+ self.emit('error', 'Unknown status %r from child process', status)
+ self.abort()
+ return False
+
+ def _read_line_from_child(self):
+ """Read a line from the child process using low-level IO.
+
+ This is a hack to work around the fact that file.readline() buffers
+ data without us knowing about it. If we call readline() a second
+ time when no data is buffered, it may block (=> the UI would hang).
+ If OTOH there is another line already in the buffer, we won't get
+ notified about it by select() as it already is in userspace.
+ There are cleaner ways to handle this (e.g. using the asyncore module),
+ but they are much more complex.
+ """
+ line = []
+ while True:
+ c = os.read(self._pipe_from_child.fileno(), 1)
+ if c == '\n':
+ return ''.join(line)
+
+ line.append(c)
+
+ def _parent_close(self):
+ """Close connections to child and wait for it."""
+ gobject.source_remove(self._pipe_from_child_watch_id)
+ self._pipe_from_child.close()
+ self._pipe_to_child.close()
+ pid_, status = os.waitpid(self._child_pid, 0)
+ if os.WIFEXITED(status):
+ logging.debug('Child exited with rc=%d', os.WEXITSTATUS(status))
+ elif os.WIFSIGNALED(status):
+ logging.debug('Child killed by signal %d', os.WTERMSIG(status))
+ else:
+ logging.error('Sudden infant death syndrome')
+
+ def _child_run(self):
+ """Main program of child."""
+ try:
+ self._connect_to_data_store()
+ self._entries, self._num_entries = self._find_entries()
+ assert self._num_entries == len(self._entries)
+ self._path, self._bundle = self._create_bundle()
+
+ for position, entry in enumerate(self._entries):
+ self._client_check_command()
+
+ self._send_to_parent('progress\n%d\n%d\n' % (position,
+ self._num_entries))
+ logging.debug('processing entry %r', entry)
+ self._add_entry(self._bundle, entry)
+
+ self._send_to_parent('progress\n%d\n%d\n' % (
+ self._num_entries, self._num_entries))
+ self._bundle.fp.flush()
+ self._bundle.close()
+ self._send_to_parent('done\n')
+
+ # pylint: disable=W0703
+ except Exception, exception:
+ self._pipe_from_child.write('error\n')
+ message = unicode(exception).encode('utf-8')
+ self._pipe_from_child.write(message+'\n')
+ trace = unicode(traceback.format_exc()).encode('utf-8')
+ self._pipe_from_child.write(trace)
+ self._remove_bundle()
+ sys.exit(2)
+
+ def _send_to_parent(self, message):
+ self._pipe_from_child.write(message)
+ self._pipe_from_child.flush()
+
+ def _client_check_command(self):
+ """Check for and execute command from the parent."""
+ in_ready, out_ready_, errors_on_ = select.select([self._pipe_to_child],
+ [], [], 0)
+ if not in_ready:
+ return
+
+ command = self._pipe_to_child.readline().strip()
+ logging.debug('command %r received', command)
+ if command == 'abort':
+ self._remove_bundle()
+ sys.exit(1)
+ else:
+ raise ValueError('Unknown command %r' % (command, ))
+
+ def _create_bundle(self):
+ """Create / open bundle (zip file) with unique file name."""
+ date = time.strftime('%x')
+ prefix = _('Journal backup of %s (%s) on %s') % (self._user_name,
+ self._key_hash, date)
+ bundle_fd, path = self._create_file(self._mount_point, prefix, '.xoj')
+ try:
+ return path, zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
+ finally:
+ os.close(bundle_fd)
+
+ def _remove_bundle(self):
+ """Close bundle and remove it from permanent storage."""
+ if self._path:
+ os.remove(self._path)
+
+ if self._bundle and self._bundle.fp and not self._bundle.fp.closed:
+ self._bundle.close()
+
+ def _create_file(self, directory, prefix, suffix):
+ """Create a unique file with given prefix and suffix in directory.
+
+ Append random ASCII characters only if necessary.
+ """
+ path = '%s/%s%s' % (directory, prefix, suffix)
+ flags = os.O_CREAT | os.O_EXCL
+ mode = 0600
+ try:
+ return os.open(path, flags, mode), path
+
+ except OSError:
+ return tempfile.mkstemp(dir=directory, prefix=prefix + ' ',
+ suffix=suffix)
+
+ def _add_entry(self, bundle, entry):
+ """Add data store entry identified by entry to bundle."""
+ if 'version_id' in entry:
+ object_id = (entry['tree_id'], entry['version_id'])
+ object_id_s = '%s,%s' % object_id
+ else:
+ object_id = entry['uid']
+ object_id_s = object_id
+
+ metadata = self._get_metadata(object_id)
+ data_path = self._get_data(object_id)
+ if data_path:
+ bundle.write(data_path, os.path.join(object_id_s, object_id_s))
+
+ for name, value in metadata.items():
+ is_binary = False
+ try:
+ value.encode('utf-8')
+ except UnicodeDecodeError:
+ is_binary = True
+
+ if is_binary or len(value) > 8192:
+ logging.debug('adding binary/large property %r', name)
+ bundle.writestr(os.path.join(object_id_s, str(name),
+ object_id_s), value)
+ del metadata[name]
+
+ bundle.writestr(os.path.join(object_id_s, self._METADATA_JSON_NAME),
+ json.dumps(metadata))
+
+ def _connect_to_data_store(self):
+ """Open a connection to a Sugar data store."""
+ # We forked => need to use a private connection and make sure we
+ # never allow the main loop to run
+ # http://lists.freedesktop.org/archives/dbus/2007-April/007498.html
+ # http://lists.freedesktop.org/archives/dbus/2007-August/008359.html
+ bus = dbus.SessionBus(private=True)
+ try:
+ self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE,
+ DS_DBUS_PATH2), DS_DBUS_INTERFACE2)
+ logging.info('Data store with version support found')
+ return
+
+ except dbus.DBusException:
+ logging.debug('No data store with version support found')
+
+ self._data_store = dbus.Interface(bus.get_object(DS_DBUS_SERVICE,
+ DS_DBUS_PATH1), DS_DBUS_INTERFACE1)
+ logging.info('Data store without version support found')
+
+ def _find_entries(self):
+ """Retrieve a list of all entries from the data store."""
+ if self._data_store.dbus_interface == DS_DBUS_INTERFACE2:
+ return self._data_store.find({}, {'metadata':
+ ['tree_id', 'version_id'], 'all_versions': True},
+ timeout=5*60, byte_arrays=True)
+ else:
+ return self._data_store.find({}, ['uid'], byte_arrays=True)
+
+ def _get_metadata(self, object_id):
+ """Return metadata for data store entry identified by object_id."""
+ if self._data_store.dbus_interface == DS_DBUS_INTERFACE2:
+ tree_id, version_id = object_id
+ return self._data_store.find(
+ {'tree_id': tree_id, 'version_id': version_id}, {},
+ byte_arrays=True)[0][0]
+ else:
+ return self._data_store.get_properties(object_id, byte_arrays=True)
+
+ def _get_data(self, object_id):
+ """Return path to data for data store entry identified by object_id."""
+ if self._data_store.dbus_interface == DS_DBUS_INTERFACE2:
+ tree_id, version_id = object_id
+ return self._data_store.get_data(tree_id, version_id,
+ byte_arrays=True)
+ else:
+ return self._data_store.get_filename(object_id, byte_arrays=True)
+
+
class BackupActivity(activity.Activity):
_METADATA_JSON_NAME = '_metadata.json'
def __init__(self, handle):
- activity.Activity.__init__(self, handle)
+ activity.Activity.__init__(self, handle, create_jobject=False)
self.max_participants = 1
- self._msg_box_buf = None
+ self._progress_bar = None
+ self._message_box = None
+ self._backup = None
self._setup_widgets()
+ def read_file(self, file_path):
+ """We don't have any state to save in the Journal."""
+ return
+
+ def write_file(self, file_path):
+ """We don't have any state to save in the Journal."""
+ return
+
+ def close(self, skip_save=False):
+ """We don't have any state to save in the Journal."""
+ activity.Activity.close(self, skip_save=True)
+
def _setup_widgets(self):
self._setup_toolbar()
- self._setup_canvas()
+ self._setup_main_view()
- def _setup_canvas(self):
+ def _setup_main_view(self):
vbox = gtk.VBox()
-
- msg_box = gtk.TextView()
- msg_box.set_editable(False)
- msg_box.set_wrap_mode(gtk.WRAP_WORD)
- self._msg_box_buf = msg_box.get_buffer()
- vbox.pack_start(msg_box, expand=True, fill=True)
- msg_box.show()
-
self.set_canvas(vbox)
vbox.show()
def _setup_toolbar(self):
toolbar_box = ToolbarBox()
- activity_button = ActivityToolbarButton(self)
- toolbar_box.toolbar.insert(activity_button, -1)
- activity_button.show()
-
backup_button = BackupButton()
backup_button.connect('clicked', self.__backup_cb)
toolbar_box.toolbar.insert(backup_button, -1)
@@ -117,71 +418,77 @@ class BackupActivity(activity.Activity):
self.set_toolbar_box(toolbar_box)
toolbar_box.show()
- def _add_msg(self, msg):
- self._msg_box_buf.insert(self._msg_box_buf.get_end_iter(),
- '%.1f %s\n' % (time.time(), msg))
-
def __backup_cb(self, button):
- try:
- self._backup()
- except Exception, exc:
- logging.exception('Exception occured in _backup:')
- self._add_msg('Error: %r' % (exc,))
-
- def _backup(self):
- # TODO: operate in chunks to conserve memory
- # TODO: async operation
- # TODO: better error handling
- self._add_msg(_('Starting backup'))
- path, bundle = self._create_bundle()
- self._add_msg(_('Filename: %s') % (path, ))
- for entry in datastore.find({})[0]:
- self._add_msg(_('Processing entry %s (%s)') % (entry.object_id,
- entry.metadata.get('title', '')))
- self._add_entry(bundle, entry)
-
- bundle.close()
- self._add_msg(_('Backup finished'))
+ """Callback for Backup button."""
+ # FIXME
+ mount_point = '/media/tmp'
+ self._setup_backup_view(mount_point)
+ self._start_backup(mount_point)
+
+ def _start_backup(self, mount_point):
+ """Set up and start background worker process."""
+ self._backup = AsyncBackup(mount_point)
+ self._backup.connect('progress', self._progress_cb)
+ self._backup.connect('error', self._error_cb)
+ self._backup.connect('done', self._done_cb)
+ self._backup.start()
+
+ def _setup_backup_view(self, mount_point):
+ """Set up UI for showing feedback from worker process."""
+ vbox = gtk.VBox(False)
+
+ label = gtk.Label(_('Backing up Journal to %s') % (mount_point, ))
+ label.show()
+ vbox.pack_start(label)
+
+ alignment = gtk.Alignment(xalign=0.5, yalign=0.5, xscale=0.5)
+ alignment.show()
+
+ self._progress_bar = gtk.ProgressBar()
+ self._progress_bar.show()
+ alignment.add(self._progress_bar)
+ vbox.add(alignment)
+
+ self._message_box = gtk.Label()
+ vbox.pack_start(self._message_box)
+
+ # FIXME
+# self._close_button = gtk.Button(_('Abort'))
+# self._close_button.connect('clicked', self._close_cb)
+# self._close_button.show()
+# button_box = gtk.HButtonBox()
+# button_box.show()
+# button_box.add(self._close_button)
+# vbox.pack_start(button_box, False)
- def _create_bundle(self):
- # TODO: create directly on target device
- fd, path = tempfile.mkstemp()
- try:
- return path, zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
- finally:
- os.close(fd)
+ vbox.show()
+ self.set_canvas(vbox)
+ self.show()
- def _add_entry(self, bundle, entry):
- metadata = entry.metadata.get_dictionary().copy()
- logging.debug('metadata = %r', metadata)
- object_id = entry.object_id
- if isinstance(object_id, tuple):
- metadata['tree_id'], metadata['version_id'] = object_id
- object_id = '%s,%s' % object_id
- else:
- metadata['uid'] = object_id
- object_id = str(object_id)
+ def _progress_cb(self, backup, position, num_entries):
+ """Update progress bar with information from child process."""
+ self._progress_bar.props.text = '%d / %d' % (position, num_entries)
+ self._progress_bar.props.fraction = float(position) / num_entries
- logging.debug('object_id = %r', object_id)
- data_path = entry.file_path
- if data_path:
- bundle.write(data_path, os.path.join(object_id, object_id))
+ def _done_cb(self, backup):
+ """Backup finished."""
+ logging.debug('_done_cb')
+# self._close_button.set_label(_('Finish'))
- for name, value in metadata.items():
- is_binary = False
- try:
- value.encode('utf-8')
- except UnicodeDecodeError:
- is_binary = True
+ def _error_cb(self, backup, message):
+ """Receive error message from child process."""
+ self._show_error(message)
- if is_binary or len(value) > 8192:
- logging.debug('adding binary/large property %r', name)
- bundle.writestr(os.path.join(object_id, str(name), object_id),
- value)
- del metadata[name]
+ def _show_error(self, message):
+ """Present error message to user."""
+ self._message_box.props.label = message
+ self._message_box.show()
- bundle.writestr(os.path.join(object_id, self._METADATA_JSON_NAME),
- json.dumps(metadata))
+# def _close_cb(self, button):
+# if not self._done:
+# self._backup.abort()
+
+# self.emit('close')
sugar.logger.start()