#!/usr/bin/env python # Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import locale import gettext import logging from os.path import exists, join, isabs from sugar_network.toolkit import coroutine coroutine.inject() from sugar_network import db, node, toolkit from sugar_network.node.auth import SugarAuth, RootAuth from sugar_network.node import obs, master, slave, model, stats from sugar_network.toolkit.http import Connection from sugar_network.toolkit.router import Router from sugar_network.toolkit import application, Option, enforce backdoor = Option( 'path to a UNIX socket to serve administrative API requests; ' 'the entry point is not authenticated and assumes root privileges, ' 'thus, make sure that --backdoor path is accessible only by admins; ' 'not absolute path will be prefixed by the --rundir', default='backdoor', name='backdoor') http_logdir = Option( 'path to a directory to log HTTP requests; ' 'not absolute path will be prefixed by the --logdir', name='http-logdir') gettext.textdomain('sugar-network') class Application(application.Daemon): jobs = coroutine.Pool() servers = [] def prolog(self): if not exists(node.data_root.value): os.makedirs(node.data_root.value) enforce(os.access(node.data_root.value, os.W_OK), 'No write access to %r directory', node.data_root.value) for opt, dirname in [ (toolkit.cachedir, 'cache'), (application.logdir, 'log'), (application.rundir, 'run'), ]: if not opt.value: opt.value = join(node.data_root.value, dirname) if not exists(opt.value): os.makedirs(opt.value) if http_logdir.value and not isabs(http_logdir.value): http_logdir.value = \ join(application.logdir.value, http_logdir.value) if not isabs(backdoor.value): backdoor.value = join(application.rundir.value, backdoor.value) def run(self): enforce(node.master_api.value, 'Option --master-api missed') ssl_args = {} if node.keyfile.value: ssl_args['keyfile'] = node.keyfile.value if node.certfile.value: ssl_args['certfile'] = node.certfile.value if node.mode.value == 'master': node_routes_class = master.MasterRoutes resources = master.RESOURCES logging.info('Start master node') else: node_routes_class = slave.SlaveRoutes resources = slave.RESOURCES logging.info('Start slave node') volume = model.Volume(node.data_root.value, resources) routes = node_routes_class(node.master_api.value, volume=volume, auth=SugarAuth(node.data_root.value), find_limit=node.find_limit.value) self.jobs.spawn(volume.populate) self.jobs.spawn(model.presolve, join(node.data_root.value, 'files')) if stats.stats.value: routes.stats_init(join(node.data_root.value, 'var'), stats.stats_step.value, stats.stats_rras.value) self.jobs.spawn(routes.stats_auto_commit) logging.info('Listen requests on %s:%s', node.host.value, node.port.value) server = coroutine.WSGIServer( (node.host.value, node.port.value), Router(routes), http_log=_open_logfile('access'), **ssl_args) self.jobs.spawn(server.serve_forever) self.servers.append(server) logging.info('Listen admin requests on %s', backdoor.value) sock = coroutine.listen_unix_socket(backdoor.value, reuse_address=True, mode=0660) routes = node_routes_class(node.master_api.value, volume=volume, auth=RootAuth()) server = coroutine.WSGIServer(sock, Router(routes), http_log=_open_logfile('backdoor')) self.jobs.spawn(server.serve_forever) self.servers.append(server) self.accept() try: self.jobs.join() finally: if stats.stats.value: routes.stats_commit() volume.close() os.unlink(backdoor.value) def shutdown(self): self.jobs.kill() def reopen_logs(self): application.Daemon.reopen_logs(self) if http_logdir.value: for server in self.servers: if server.http_log is None: continue server.http_log.close() server.http_log = file(server.http_log.name, 'a+') @application.command( 'direct synchronization with master node', name='online-sync') def online_sync(self): enforce(node.mode.value == 'slave', 'Node is not slave') self._ensure_instance().post(cmd='online_sync') @application.command( 'sneakernet synchronization with other nodes using files ' 'placed to the specified directory', args='PATH', name='offline-sync') def offline_sync(self): enforce(node.mode.value == 'slave', 'Node is not slave') enforce(self.args, 'PATH was not specified') path = self.args.pop(0) self._ensure_instance().post(cmd='offline_sync', path=path) @application.command( 'resolve/presolve packages for all package-type Contexts', name='resolve') def resolve(self): enforce(not self.check_for_instance(), 'Node should be stopped') enforce(node.mode.value == 'master', 'Node is not master') volume = model.Volume(node.data_root.value, master.RESOURCES) try: volume.populate() for doc in volume['context']: if 'package' not in doc['type']: continue for key, agg in doc['releases'].items(): if key == 'resolves' or 'value' not in agg: continue model.resolve_package(doc, key, agg['value']) doc.post('releases') finally: volume.close() model.presolve(join(node.data_root.value, 'files'), block=False) @application.command( 're-generate node statistics', name='restat') def restat(self): enforce(not self.check_for_instance(), 'Node should be stopped') volume = model.Volume(node.data_root.value, master.RESOURCES) routes = slave.SlaveRoutes(node.master_api.value, volume=volume, auth=RootAuth()) try: volume.populate() routes.stats_regen(join(node.data_root.value, 'var'), stats.stats_step.value, stats.stats_rras.value) finally: volume.close() @application.command( 're-generate ratings', name='rating') def rating(self): enforce(not self.check_for_instance(), 'Node should be stopped') volume = model.Volume(node.data_root.value, master.RESOURCES) routes = slave.SlaveRoutes(node.master_api.value, volume=volume, auth=RootAuth()) try: volume.populate() routes.stats_regen_rating(join(node.data_root.value, 'var'), stats.stats_step.value, stats.stats_rras.value) finally: volume.close() def _ensure_instance(self): enforce(self.check_for_instance(), 'Node is not started') return Connection('file://' + backdoor.value) def _open_logfile(name): if not http_logdir.value: return None if not exists(http_logdir.value): os.makedirs(http_logdir.value) return file(join(http_logdir.value, name + '.log'), 'a+') locale.setlocale(locale.LC_ALL, '') # New defaults application.logdir.value = None application.rundir.value = None Option.seek('main', application) Option.seek('main', [toolkit.cachedir]) Option.seek('node', node) Option.seek('node', [http_logdir]) Option.seek('node', stats) Option.seek('obs', obs) Option.seek('db', db) app = Application( name='sugar-network-node', description='Sugar Network node server', epilog='See http://wiki.sugarlabs.org/go/Sugar_Network ' 'for details.', config_files=[ '/etc/sugar-network.d', '/etc/sugar-network.conf', '~/.config/sugar-network/config', ]) app.start()