#!/usr/bin/env python # Copyright (C) 2012 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import sys import json import shlex import errno import signal import locale import logging from contextlib import contextmanager from os.path import join, abspath, exists import active_document as ad import sugar_network_webui as webui from sugar_network import toolkit, local, node from sugar_network.toolkit import sugar, sneakernet from sugar_network.toolkit import dbus_thread, mounts_monitor from sugar_network.local import activities, datastore from sugar_network.local.dbus_datastore import Datastore from sugar_network.local.dbus_network import Network from sugar_network.local.mounts import HomeMount, RemoteMount from sugar_network.local.mountset import Mountset from sugar_network.local.ipc_client import Router from sugar_network.node import stats from sugar_network.resources.volume import Volume from active_toolkit.options import Option from active_toolkit import util, printf, application, coroutine, enforce VERSION = '0.4' class NullHandler(logging.Handler): def emit(self, record): pass class Application(application.Application): _ipc_server = None _events = {} def __init__(self, **kwargs): application.Application.__init__(self, **kwargs) if application.debug.value < 3: for log_name in ( 'requests.packages.urllib3.connectionpool', 'requests.packages.urllib3.poolmanager', 'requests.packages.urllib3.response', 'requests.packages.urllib3', 'inotify', 'netlink', 'sneakernet', 'toolkit', ): logger = logging.getLogger(log_name) logger.propagate = False logger.addHandler(NullHandler()) new_root = (local.local_root.value != local.local_root.default) local.local_root.value = abspath(local.local_root.value) if new_root: application.logdir.value = join(local.local_root.value, 'log') else: application.logdir.value = sugar.profile_path('logs') application.rundir.value = join(local.local_root.value, 'run') if not exists(local.tmpdir.value): os.makedirs(local.tmpdir.value) sneakernet.TMPDIR = local.tmpdir.value coroutine.signal(signal.SIGCHLD, self.__SIGCHLD_cb) @application.command( 'index local Sugar Network database') def index(self): if self.check_for_instance(): printf.info('%s already started, no need in index', self.name) return if not exists(sugar.profile_path('owner.key')): # Command was launched in foreign environment sugar.uid = lambda: 'index' sugar.nickname = lambda: 'index' printf.info('Index database in %r', local.local_root.value) volume = Volume(self._db_path) try: volume.populate() activities.populate(volume['context'], local.activity_dirs.value) finally: volume.close() @application.command( 'start sneakernet synchronization; if PATH is specified, ' 'use it as a synchronization directory; otherwise, ' 'look for mounts (in --mounts-root) that contain ' 'sugar-network-sync/ subdirectory', args='[PATH]') def offline_sync(self): with self._rendezvous(): path = None if self.args: path = self.args.pop(0) Client.call('POST', cmd='start_sync', rewind=True, path=path) self._events['sync_complete'].wait() @application.command(hidden=True) def POST(self): self._call('POST', sys.stdin.read()) @application.command(hidden=True) def PUT(self): self._call('PUT', sys.stdin.read()) @application.command(hidden=True) def GET(self): result = self._call('GET', None) if type(result) in (list, set, tuple): for i in result: print i elif type(result) is dict: print json.dumps(result, indent=2) else: print result @application.command( 'start service and log to standard output') def debug(self): self._start() @application.command( 'start service and log to files', name='start', keep_stdout=True) def _start(self): if self.check_for_instance(): printf.info('%s is already run', self.name) exit(1) jobs = coroutine.Pool() mountset = self._mountset() def delayed_start(event=None): logging.info('Proceed delayed start') mountset.disconnect(delayed_start) mountset.volume.populate() datastore.populate(mountset.volume['artifact']) self._sync(mountset.volume) logging.info('Listening for IPC requests on %s port', local.ipc_port.value) server = coroutine.WSGIServer(('localhost', local.ipc_port.value), Router(mountset)) jobs.spawn(server.serve_forever) jobs.spawn(activities.monitor, mountset.volume['context'], local.activity_dirs.value) if webui.webui.value: host = (webui.webui_host.value, webui.webui_port.value) logging.info('Start Web server on %s:%s', *host) server = coroutine.WSGIServer(host, webui.get_app(mountset)) jobs.spawn(server.serve_forever) if local.mounts_root.value: mounts_monitor.start(abspath(local.mounts_root.value)) if local.delayed_start.value: mountset.connect(delayed_start, event='delayed-start') else: delayed_start() dbus_thread.spawn_service(Datastore) dbus_thread.spawn_service(Network) pid_path = self.new_instance() try: mountset.open() dbus_thread.start(mountset) except KeyboardInterrupt: util.exception('%s interrupted', self.name) except Exception: util.exception('%s aborted', self.name) finally: mounts_monitor.stop() jobs.kill() mountset.close() os.unlink(pid_path) def _mountset(self): if local.anonymous.value: sugar.uid = lambda: 'anonymous' sugar.nickname = lambda: 'anonymous' sugar.color = lambda: '#000000,#000000' else: # In case if it is new Sugar Shell profile toolkit.ensure_dsa_pubkey(sugar.profile_path('owner.key')) volume = Volume(self._db_path, lazy_open=local.lazy_open.value) mountset = Mountset(volume) mountset['~'] = HomeMount(volume) mountset['/'] = RemoteMount(volume) return mountset @contextmanager def _rendezvous(self): def events_cb(event): if event['event'] == 'sync_start': printf.info('Synchronize with %(path)s directory' % event) elif event['event'] == 'sync_progress': printf.progress(event['progress']) elif event['event'] == 'sync_continue': printf.info('Mounted synchronization disk(s) is full, ' \ 'mount new one to %s', local.mounts_root.value) elif event['event'] == 'sync_error': printf.info('Failed to sync, %(error)s' % event) elif event['event'] == 'sync_complete': self._events['sync_complete'].set() self._events['sync_complete'] = coroutine.Event() pid_path = None mountset = None try: if not self.check_for_instance(): printf.info('%s is not started, ' \ 'launch it for this command only', self.name) pid_path = self.new_instance() mountset = self._mountset() dbus_thread.spawn_service(Network) coroutine.spawn(dbus_thread.start, mountset) coroutine.dispatch() mountset.opened.wait() Client.connect(events_cb) yield finally: if mountset is not None: mountset.close() os.unlink(pid_path) def _call(self, method, content=None): kwargs = {} for arg in self.args: pair = shlex.split(arg) if not pair: continue pair = pair[0] enforce('=' in pair, 'No "=" assign symbol in %r expression', arg) arg, value = pair.split('=', 1) arg = arg.strip() enforce(arg, 'No argument name in %r expression', arg) if arg in kwargs: if isinstance(kwargs[arg], basestring): kwargs[arg] = [kwargs[arg]] kwargs[arg].append(value) else: kwargs[arg] = value with self._rendezvous(): return Client.call(method, content=content, **kwargs) @property def _db_path(self): return join(local.local_root.value, 'local') def __SIGCHLD_cb(self): while True: try: pid, __ = os.waitpid(-1, os.WNOHANG) if pid: continue except OSError, error: if error.errno != errno.ECHILD: raise break def _sync(self, volume): contexts = volume['context'] docs, __ = contexts.find(limit=ad.MAX_LIMIT, keep_impl=[1, 2]) for context in docs: if not activities.ensure_checkins(context.guid): contexts.update(context.guid, keep_impl=0) locale.setlocale(locale.LC_ALL, '') # New defaults application.debug.value = sugar.logger_level() # It seems to be that most of users (on XO at least) don't have recent SSH node.trust_users.value = True # If tmpfs is mounted to /tmp, `os.fstat()` will return 0 free space # and will brake offline synchronization logic local.tmpdir.value = sugar.profile_path('tmp') Option.seek('main', [application.debug]) Option.seek('webui', webui) Option.seek('local', local) Option.seek('local', [sugar.keyfile]) Option.seek('node', [node.port, node.sync_dirs]) Option.seek('stats', stats) Option.seek('active-document', ad) application = Application( name='sugar-network-service', description='Sugar Network service.', epilog='See http://wiki.sugarlabs.org/go/Sugar_Network ' \ 'for details.', version=VERSION, config_files=[ '/etc/sweets.conf', '~/.config/sweets/config', sugar.profile_path('sweets.conf'), ]) application.start()