# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import time import shutil import gettext import logging import hashlib from cStringIO import StringIO from contextlib import contextmanager from ConfigParser import ConfigParser from os.path import join, isdir, exists from sugar_network import node, toolkit, model from sugar_network.node import stats_node, stats_user # pylint: disable-msg=W0611 from sugar_network.toolkit.router import route, preroute, postroute, ACL from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute from sugar_network.toolkit.spec import EMPTY_LICENSE from sugar_network.toolkit.spec import parse_requires, ensure_requires from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit import pylru, http, coroutine, exception, enforce _MAX_STAT_RECORDS = 128 _AUTH_POOL_SIZE = 1024 _logger = logging.getLogger('node.routes') class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): def __init__(self, guid, volume): model.VolumeRoutes.__init__(self, volume) model.FrontRoutes.__init__(self) volume.broadcast = self.broadcast self._guid = guid self._stats = None self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE) self._auth_config = None self._auth_config_mtime = 0 if stats_node.stats_node.value: self._stats = stats_node.Sniffer(volume) coroutine.spawn(self._commit_stats) def close(self): if self._stats is not None: self._stats.commit() @property def guid(self): return self._guid @route('GET', cmd='logon', acl=ACL.AUTH) def logon(self): pass @route('GET', cmd='whoami', mime_type='application/json') def whoami(self, request, response): roles = [] if self.authorize(request.principal, 'root'): roles.append('root') return {'roles': roles, 'guid': request.principal, 'route': 'direct'} @route('GET', cmd='status', mime_type='application/json') def status(self): documents = {} for name, directory in self.volume.items(): documents[name] = {'mtime': directory.mtime} return {'guid': self._guid, 'resources': documents} @route('GET', cmd='stats', arguments={ 'start': int, 'end': int, 'records': int, 'source': list}, mime_type='application/json') def stats(self, start, end, records, source): if not source or records <= 0 or start > end: return {} enforce(self._stats is not None, 'Node stats is disabled') if records > _MAX_STAT_RECORDS: _logger.debug('Decrease %d stats records number to %d', records, _MAX_STAT_RECORDS) records = _MAX_STAT_RECORDS resolution = (end - start) / records stats = {} for i in source: enforce('.' in i, 'Misnamed source') db_name, ds_name = i.split('.', 1) stats.setdefault(db_name, []).append(ds_name) return self._stats.report(stats, start, end, resolution) @route('POST', ['user'], mime_type='application/json') def register(self, request): # To avoid authentication while registering new user self.create(request) @fallbackroute('GET', ['packages']) def route_packages(self, request, response): enforce(node.files_root.value, http.BadRequest, 'Disabled') if request.path and request.path[-1] == 'updates': root = join(node.files_root.value, *request.path[:-1]) enforce(isdir(root), http.NotFound, 'Directory was not found') result = [] last_modified = 0 for filename in os.listdir(root): if '.' in filename: continue path = join(root, filename) mtime = int(os.stat(path).st_mtime) if mtime > request.if_modified_since: result.append(filename) last_modified = max(last_modified, mtime) response.content_type = 'application/json' if last_modified: response.last_modified = last_modified return result path = join(node.files_root.value, *request.path) enforce(exists(path), http.NotFound, 'File was not found') if not isdir(path): return toolkit.iter_file(path) result = [] for filename in os.listdir(path): if filename.endswith('.rpm') or filename.endswith('.deb'): continue result.append(filename) response.content_type = 'application/json' return result @route('POST', ['implementation'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) def submit_implementation(self, request, document): with toolkit.NamedTemporaryFile() as blob: shutil.copyfileobj(request.content_stream, blob) blob.flush() with load_bundle(self.volume, request, blob.name) as impl: impl['data']['blob'] = blob.name return impl['guid'] @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def delete(self, request): # Servers data should not be deleted immediately # to let master-slave synchronization possible request.call(method='PUT', path=request.path, content={'layer': ['deleted']}) @route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER) def attach(self, request): # TODO Reading layer here is a race directory = self.volume[request.resource] doc = directory.get(request.guid) layer = list(set(doc['layer']) | set(request.content)) directory.update(request.guid, {'layer': layer}) @route('PUT', [None, None], cmd='detach', acl=ACL.AUTH | ACL.SUPERUSER) def detach(self, request): # TODO Reading layer here is a race directory = self.volume[request.resource] doc = directory.get(request.guid) layer = list(set(doc['layer']) - set(request.content)) directory.update(request.guid, {'layer': layer}) @route('GET', ['context', None], cmd='clone', arguments={'requires': list}) def get_clone(self, request, response): return self._get_clone(request, response) @route('HEAD', ['context', None], cmd='clone', arguments={'requires': list}) def head_clone(self, request, response): self._get_clone(request, response) @route('GET', ['context', None], cmd='deplist', mime_type='application/json', arguments={'requires': list}) def deplist(self, request, repo): """List of native packages context is dependening on. Command return only GNU/Linux package names and ignores Sugar Network dependencies. :param repo: OBS repository name to get package names for, e.g., Fedora-14 :returns: list of package names """ enforce(repo, 'Argument %r should be set', 'repo') spec = self._solve(request).meta('data')['spec']['*-*'] common_deps = self.volume['context'].get(request.guid)['dependencies'] result = [] for package in set(spec.get('requires') or []) | set(common_deps): if package == 'sugar': continue dep = self.volume['context'].get(package) enforce(repo in dep['packages'], 'No packages for %r on %r', package, repo) result.extend(dep['packages'][repo].get('binary') or []) return result @route('GET', ['user', None], cmd='stats-info', mime_type='application/json', acl=ACL.AUTH) def user_stats_info(self, request): status = {} for rdb in stats_user.get_rrd(request.guid): status[rdb.name] = rdb.last + stats_user.stats_user_step.value # TODO Process client configuration in more general manner return {'enable': True, 'step': stats_user.stats_user_step.value, 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], 'status': status, } @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH) def user_stats_upload(self, request): name = request.content['name'] values = request.content['values'] rrd = stats_user.get_rrd(request.guid) for timestamp, values in values: rrd[name].put(values, timestamp) @route('GET', ['report', None], cmd='log', mime_type='text/html') def log(self, request): # In further implementations, `data` might be a tarball data = self.volume[request.resource].get(request.guid).meta('data') if data and 'blob' in data: return file(data['blob'], 'rb') else: return '' @preroute def preroute(self, op, request, response): if op.acl & ACL.AUTH and request.principal is None: if not request.authorization: enforce(self.authorize(None, 'user'), Unauthorized, 'No credentials') else: if request.authorization not in self._auth_pool: self.authenticate(request.authorization) self._auth_pool[request.authorization] = True enforce(not request.authorization.nonce or request.authorization.nonce >= time.time(), Unauthorized, 'Credentials expired') request.principal = request.authorization.login if op.acl & ACL.AUTHOR and request.guid: if request.resource == 'user': allowed = (request.principal == request.guid) else: doc = self.volume[request.resource].get(request.guid) allowed = (request.principal in doc['author']) enforce(allowed or self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for authors') if op.acl & ACL.SUPERUSER: enforce(self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for superusers') @postroute def postroute(self, request, response, result, error): if error is None or isinstance(error, http.StatusPass): if self._stats is not None: self._stats.log(request) def on_create(self, request, props, event): if request.resource == 'user': with file(props['pubkey']['blob']) as f: props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) model.VolumeRoutes.on_create(self, request, props, event) def on_update(self, request, props, event): model.VolumeRoutes.on_update(self, request, props, event) if 'deleted' in props.get('layer', []): event['event'] = 'delete' def find(self, request, reply): limit = request.get('limit') if limit is None or limit < 0: request['limit'] = node.find_limit.value elif limit > node.find_limit.value: _logger.warning('The find limit is restricted to %s', node.find_limit.value) request['limit'] = node.find_limit.value layer = request.setdefault('layer', []) if 'deleted' in layer: _logger.warning('Requesting "deleted" layer') layer.remove('deleted') request.add('not_layer', 'deleted') return model.VolumeRoutes.find(self, request, reply) def get(self, request, reply): doc = self.volume[request.resource].get(request.guid) enforce('deleted' not in doc['layer'], http.NotFound, 'Resource deleted') return model.VolumeRoutes.get(self, request, reply) def authenticate(self, auth): enforce(auth.scheme == 'sugar', http.BadRequest, 'Unknown authentication scheme') if not self.volume['user'].exists(auth.login): raise Unauthorized('Principal does not exist', auth.nonce) from M2Crypto import RSA data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() key = RSA.load_pub_key(self.volume['user'].path(auth.login, 'pubkey')) enforce(key.verify(data, auth.signature.decode('hex')), http.Forbidden, 'Bad credentials') def authorize(self, user, role): if role == 'user' and user: return True config_path = join(node.data_root.value, 'authorization.conf') if exists(config_path): mtime = os.stat(config_path).st_mtime if mtime > self._auth_config_mtime: self._auth_config_mtime = mtime self._auth_config = ConfigParser() self._auth_config.read(config_path) if self._auth_config is None: return False if not user: user = 'anonymous' if not self._auth_config.has_section(user): user = 'DEFAULT' if self._auth_config.has_option(user, role): return self._auth_config.get(user, role).strip().lower() in \ ('true', 'on', '1', 'allow') def _commit_stats(self): while True: coroutine.sleep(stats_node.stats_node_step.value) self._stats.commit() def _solve(self, request): requires = {} if 'requires' in request: for i in request['requires']: requires.update(parse_requires(i)) request.pop('requires') else: request['limit'] = 1 if 'stability' not in request: request['stability'] = 'stable' impls, __ = self.volume['implementation'].find( context=request.guid, order_by='-version', not_layer='deleted', **request) impl = None for impl in impls: if requires: impl_deps = impl.meta('data')['spec']['*-*']['requires'] if not ensure_requires(impl_deps, requires): continue break else: raise http.NotFound('No implementations found') return impl def _get_clone(self, request, response): impl = self._solve(request) result = request.call(method=request.method, path=['implementation', impl['guid'], 'data'], response=response) response.meta = impl.properties([ 'guid', 'ctime', 'layer', 'author', 'tags', 'context', 'version', 'stability', 'license', 'notes', ]) response.meta['data'] = data = impl.meta('data') for key in ('mtime', 'seqno', 'blob'): if key in data: del data[key] return result @contextmanager def load_bundle(volume, request, bundle_path): impl = request.copy() initial = False if 'initial' in impl: initial = impl.pop('initial') data = impl.setdefault('data', {}) contexts = volume['context'] context = impl.get('context') context_meta = None impls = volume['implementation'] try: bundle = Bundle(bundle_path, mime_type='application/zip') except Exception: _logger.debug('Load unrecognized bundle from %r', bundle_path) context_type = 'content' else: _logger.debug('Load Sugar Activity bundle from %r', bundle_path) context_type = 'activity' unpack_size = 0 with bundle: for arcname in bundle.get_names(): unpack_size += bundle.getmember(arcname).size spec = bundle.get_spec() context_meta = _load_context_metadata(bundle, spec) if 'requires' in impl: spec.requires.update(parse_requires(impl.pop('requires'))) context = impl['context'] = spec['context'] impl['version'] = spec['version'] impl['stability'] = spec['stability'] if spec['license'] is not EMPTY_LICENSE: impl['license'] = spec['license'] data['spec'] = {'*-*': { 'commands': spec.commands, 'requires': spec.requires, }} data['unpack_size'] = unpack_size data['mime_type'] = 'application/vnd.olpc-sugar' if initial and not contexts.exists(context): context_meta['guid'] = context context_meta['type'] = 'activity' request.call(method='POST', path=['context'], content=context_meta) context_meta = None enforce(context, 'Context is not specified') enforce('version' in impl, 'Version is not specified') enforce(context_type in contexts.get(context)['type'], http.BadRequest, 'Inappropriate bundle type') if 'license' not in impl: existing, total = impls.find( context=context, order_by='-version', not_layer='deleted') enforce(total, 'License is not specified') impl['license'] = next(existing)['license'] digest = hashlib.sha1() with file(bundle_path, 'rb') as f: while True: chunk = f.read(toolkit.BUFFER_SIZE) if not chunk: break digest.update(chunk) data['digest'] = digest.hexdigest() yield impl existing, __ = impls.find( context=context, version=impl['version'], not_layer='deleted') if 'url' not in data: data['blob'] = bundle_path impl['guid'] = \ request.call(method='POST', path=['implementation'], content=impl) for i in existing: layer = i['layer'] + ['deleted'] impls.update(i.guid, {'layer': layer}) if 'origin' in impls.get(impl['guid']).layer: diff = contexts.patch(context, context_meta) if diff: request.call(method='PUT', path=['context', context], content=diff) def _load_context_metadata(bundle, spec): result = {} for prop in ('homepage', 'mime_types'): if spec[prop]: result[prop] = spec[prop] try: icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon'])) icon = StringIO(icon_file.read()) icon_file.close() result.update({ 'artifact_icon': { 'blob': icon, 'mime_type': 'image/svg+xml', 'digest': hashlib.sha1(icon.getvalue()).hexdigest(), }, 'preview': _svg_to_png(icon.getvalue(), 160, 120), 'icon': _svg_to_png(icon.getvalue(), 55, 55), }) except Exception: exception(_logger, 'Failed to load icon') msgids = {} for prop, confname in [ ('title', 'name'), ('summary', 'summary'), ('description', 'description'), ]: if spec[confname]: msgids[prop] = spec[confname] result[prop] = {'en': spec[confname]} with toolkit.mkdtemp() as tmpdir: for path in bundle.get_names(): if not path.endswith('.mo'): continue mo_path = path.strip(os.sep).split(os.sep) if len(mo_path) != 5 or mo_path[1] != 'locale': continue lang = mo_path[2] bundle.extract(path, tmpdir) try: i18n = gettext.translation(spec['context'], join(tmpdir, *mo_path[:2]), [lang]) for prop, value in msgids.items(): msgstr = i18n.gettext(value).decode('utf8') if lang == 'en' or msgstr != value: result[prop][lang] = msgstr except Exception: exception(_logger, 'Gettext failed to read %r', mo_path[-1]) return result def _svg_to_png(data, w, h): import rsvg import cairo svg = rsvg.Handle(data=data) surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, w, h) context = cairo.Context(surface) scale = min(float(w) / svg.props.width, float(h) / svg.props.height) context.translate( int(w - svg.props.width * scale) / 2, int(h - svg.props.height * scale) / 2) context.scale(scale, scale) svg.render_cairo(context) result = StringIO() surface.write_to_png(result) result.seek(0) return {'blob': result, 'mime_type': 'image/png', 'digest': hashlib.sha1(result.getvalue()).hexdigest(), }