# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import time import shutil import gettext import logging import hashlib from cStringIO import StringIO from contextlib import contextmanager from ConfigParser import ConfigParser from os.path import join, isdir, exists from sugar_network import node, toolkit, model from sugar_network.node import stats_node, stats_user # pylint: disable-msg=W0611 from sugar_network.toolkit.router import route, preroute, postroute, ACL from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute from sugar_network.toolkit.spec import EMPTY_LICENSE from sugar_network.toolkit.spec import parse_requires, ensure_requires from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit import pylru, http, coroutine, exception, enforce _MAX_STAT_RECORDS = 100 _AUTH_POOL_SIZE = 1024 _logger = logging.getLogger('node.routes') class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): def __init__(self, guid, volume): model.VolumeRoutes.__init__(self, volume) model.FrontRoutes.__init__(self) volume.broadcast = self.broadcast self._guid = guid self._stats = None self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE) self._auth_config = None self._auth_config_mtime = 0 if stats_node.stats_node.value: stats_path = join(node.stats_root.value, 'node') self._stats = stats_node.Sniffer(volume, stats_path) coroutine.spawn(self._commit_stats) def close(self): if self._stats is not None: self._stats.suspend() @property def guid(self): return self._guid @route('GET', cmd='logon', acl=ACL.AUTH) def logon(self): pass @route('GET', cmd='whoami', mime_type='application/json') def whoami(self, request, response): roles = [] if self.authorize(request.principal, 'root'): roles.append('root') return {'roles': roles, 'guid': request.principal, 'route': 'direct'} @route('GET', cmd='status', mime_type='application/json') def status(self): documents = {} for name, directory in self.volume.items(): documents[name] = {'mtime': directory.mtime} return {'guid': self._guid, 'resources': documents} @route('GET', cmd='stats', arguments={ 'start': int, 'end': int, 'records': int, 'source': list}, mime_type='application/json') def stats(self, start, end, records, source): enforce(self._stats is not None, 'Node stats is disabled') if not source: return {} if records > _MAX_STAT_RECORDS: _logger.debug('Decrease %d stats records number to %d', records, _MAX_STAT_RECORDS) records = _MAX_STAT_RECORDS elif records <= 0: records = _MAX_STAT_RECORDS / 10 stats = {} for i in source: enforce('.' in i, 'Misnamed source') db_name, ds_name = i.split('.', 1) stats.setdefault(db_name, []).append(ds_name) return self._stats.report(stats, start, end, records) @route('POST', ['user'], mime_type='application/json') def register(self, request): # To avoid authentication while registering new user self.create(request) @fallbackroute('GET', ['packages']) def route_packages(self, request, response): enforce(node.files_root.value, http.BadRequest, 'Disabled') if request.path and request.path[-1] == 'updates': root = join(node.files_root.value, *request.path[:-1]) enforce(isdir(root), http.NotFound, 'Directory was not found') result = [] last_modified = 0 for filename in os.listdir(root): if '.' in filename: continue path = join(root, filename) mtime = int(os.stat(path).st_mtime) if mtime > request.if_modified_since: result.append(filename) last_modified = max(last_modified, mtime) response.content_type = 'application/json' if last_modified: response.last_modified = last_modified return result path = join(node.files_root.value, *request.path) enforce(exists(path), http.NotFound, 'File was not found') if not isdir(path): return toolkit.iter_file(path) result = [] for filename in os.listdir(path): if filename.endswith('.rpm') or filename.endswith('.deb'): continue result.append(filename) response.content_type = 'application/json' return result @route('POST', ['implementation'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) def submit_implementation(self, request, document): with toolkit.NamedTemporaryFile() as blob: shutil.copyfileobj(request.content_stream, blob) blob.flush() with load_bundle(self.volume, request, blob.name) as impl: impl['data']['blob'] = blob.name return impl['guid'] @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def delete(self, request): # Servers data should not be deleted immediately # to let master-slave synchronization possible request.call(method='PUT', path=request.path, content={'layer': ['deleted']}) @route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER) def attach(self, request): # TODO Reading layer here is a race directory = self.volume[request.resource] doc = directory.get(request.guid) layer = list(set(doc['layer']) | set(request.content)) directory.update(request.guid, {'layer': layer}) @route('PUT', [None, None], cmd='detach', acl=ACL.AUTH | ACL.SUPERUSER) def detach(self, request): # TODO Reading layer here is a race directory = self.volume[request.resource] doc = directory.get(request.guid) layer = list(set(doc['layer']) - set(request.content)) directory.update(request.guid, {'layer': layer}) @route('GET', ['context', None], cmd='clone', arguments={'requires': list}) def get_clone(self, request, response): return self._get_clone(request, response) @route('HEAD', ['context', None], cmd='clone', arguments={'requires': list}) def head_clone(self, request, response): self._get_clone(request, response) @route('GET', ['context', None], cmd='deplist', mime_type='application/json', arguments={'requires': list}) def deplist(self, request, repo): """List of native packages context is dependening on. Command return only GNU/Linux package names and ignores Sugar Network dependencies. :param repo: OBS repository name to get package names for, e.g., Fedora-14 :returns: list of package names """ enforce(repo, 'Argument %r should be set', 'repo') spec = self._solve(request).meta('data')['spec']['*-*'] common_deps = self.volume['context'].get(request.guid)['dependencies'] result = [] for package in set(spec.get('requires') or []) | set(common_deps): if package == 'sugar': continue dep = self.volume['context'].get(package) enforce(repo in dep['packages'], 'No packages for %r on %r', package, repo) result.extend(dep['packages'][repo].get('binary') or []) return result @route('GET', ['user', None], cmd='stats-info', mime_type='application/json', acl=ACL.AUTH) def user_stats_info(self, request): status = {} for rdb in stats_user.get_rrd(request.guid): status[rdb.name] = rdb.last + stats_user.stats_user_step.value # TODO Process client configuration in more general manner return {'enable': True, 'step': stats_user.stats_user_step.value, 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], 'status': status, } @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH) def user_stats_upload(self, request): name = request.content['name'] values = request.content['values'] rrd = stats_user.get_rrd(request.guid) for timestamp, values in values: rrd[name].put(values, timestamp) @route('GET', ['report', None], cmd='log', mime_type='text/html') def log(self, request): # In further implementations, `data` might be a tarball data = self.volume[request.resource].get(request.guid).meta('data') if data and 'blob' in data: return file(data['blob'], 'rb') else: return '' @preroute def preroute(self, op, request, response): if op.acl & ACL.AUTH and request.principal is None: if not request.authorization: enforce(self.authorize(None, 'user'), Unauthorized, 'No credentials') else: if request.authorization not in self._auth_pool: self.authenticate(request.authorization) self._auth_pool[request.authorization] = True enforce(not request.authorization.nonce or request.authorization.nonce >= time.time(), Unauthorized, 'Credentials expired') request.principal = request.authorization.login if op.acl & ACL.AUTHOR and request.guid: if request.resource == 'user': allowed = (request.principal == request.guid) else: doc = self.volume[request.resource].get(request.guid) allowed = (request.principal in doc['author']) enforce(allowed or self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for authors') if op.acl & ACL.SUPERUSER: enforce(self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for superusers') @postroute def postroute(self, request, response, result, error): if error is None or isinstance(error, http.StatusPass): if self._stats is not None: self._stats.log(request) def on_create(self, request, props, event): if request.resource == 'user': with file(props['pubkey']['blob']) as f: props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) model.VolumeRoutes.on_create(self, request, props, event) def on_update(self, request, props, event): model.VolumeRoutes.on_update(self, request, props, event) if 'deleted' in props.get('layer', []): event['event'] = 'delete' def find(self, request, reply): limit = request.get('limit') if limit is None or limit < 0: request['limit'] = node.find_limit.value elif limit > node.find_limit.value: _logger.warning('The find limit is restricted to %s', node.find_limit.value) request['limit'] = node.find_limit.value layer = request.setdefault('layer', []) if 'deleted' in layer: _logger.warning('Requesting "deleted" layer') layer.remove('deleted') request.add('not_layer', 'deleted') return model.VolumeRoutes.find(self, request, reply) def get(self, request, reply): doc = self.volume[request.resource].get(request.guid) enforce('deleted' not in doc['layer'], http.NotFound, 'Resource deleted') return model.VolumeRoutes.get(self, request, reply) def authenticate(self, auth): enforce(auth.scheme == 'sugar', http.BadRequest, 'Unknown authentication scheme') if not self.volume['user'].exists(auth.login): raise Unauthorized('Principal does not exist', auth.nonce) from M2Crypto import RSA data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() key = RSA.load_pub_key(self.volume['user'].path(auth.login, 'pubkey')) enforce(key.verify(data, auth.signature.decode('hex')), http.Forbidden, 'Bad credentials') def authorize(self, user, role): if role == 'user' and user: return True config_path = join(node.data_root.value, 'authorization.conf') if exists(config_path): mtime = os.stat(config_path).st_mtime if mtime > self._auth_config_mtime: self._auth_config_mtime = mtime self._auth_config = ConfigParser() self._auth_config.read(config_path) if self._auth_config is None: return False if not user: user = 'anonymous' if not self._auth_config.has_section(user): user = 'DEFAULT' if self._auth_config.has_option(user, role): return self._auth_config.get(user, role).strip().lower() in \ ('true', 'on', '1', 'allow') def _commit_stats(self): while True: coroutine.sleep(stats_node.stats_node_step.value) self._stats.commit() def _solve(self, request): requires = {} if 'requires' in request: for i in request['requires']: requires.update(parse_requires(i)) request.pop('requires') else: request['limit'] = 1 if 'stability' not in request: request['stability'] = 'stable' impls, __ = self.volume['implementation'].find( context=request.guid, order_by='-version', not_layer='deleted', **request) impl = None for impl in impls: if requires: impl_deps = impl.meta('data')['spec']['*-*']['requires'] if not ensure_requires(impl_deps, requires): continue break else: raise http.NotFound('No implementations found') return impl def _get_clone(self, request, response): impl = self._solve(request) result = request.call(method=request.method, path=['implementation', impl['guid'], 'data'], response=response) response.meta = impl.properties([ 'guid', 'ctime', 'layer', 'author', 'tags', 'context', 'version', 'stability', 'license', 'notes', ]) response.meta['data'] = data = impl.meta('data') for key in ('mtime', 'seqno', 'blob'): if key in data: del data[key] return result def generate_node_stats(volume, path): tmp_path = toolkit.mkdtemp() new_stats = stats_node.Sniffer(volume, tmp_path, True) old_stats = stats_node.Sniffer(volume, path) def timeline(ts): ts = long(ts) end = long(time.time()) step = None archives = {} for rra in stats_node.stats_node_rras.value: a_step, a_size = [long(i) for i in rra.split(':')[-2:]] a_step *= stats_node.stats_node_step.value a_start = end - min(end, a_step * a_size) if archives.setdefault(a_start, a_step) > a_step: archives[a_start] = a_step archives = list(sorted(archives.items())) try: while ts <= end: while not step or archives and ts >= archives[0][0]: archive_start, step = archives.pop(0) ts = max(ts / step * step, archive_start) yield ts, ts + step - 1, step ts += step except GeneratorExit: shutil.rmtree(tmp_path, ignore_errors=True) start = next(volume['context'].find(limit=1, order_by='ctime')[0])['ctime'] for left, right, step in timeline(start): for resource, props in [ ('user', []), ('context', []), ('implementation', ['context']), ('artifact', ['context']), ('feedback', ['context']), ('solution', ['context', 'feedback']), ('review', ['context', 'artifact', 'rating']), ('report', ['context', 'implementation']), ('comment', ['context', 'review', 'feedback', 'solution']), ]: objs, __ = volume[resource].find( query='ctime:%s..%s' % (left, right)) for obj in objs: request = Request(method='POST', path=[resource], content=obj.properties(props)) new_stats.log(request) for resource, props in [ ('user', ['layer']), ('context', ['layer']), ('implementation', ['layer']), ('artifact', ['layer']), ('feedback', ['layer', 'solution']), ('solution', ['layer']), ('review', ['layer']), ('report', ['layer']), ('comment', ['layer']), ]: objs, __ = volume[resource].find( query='mtime:%s..%s' % (left, right)) for obj in objs: if 'deleted' in obj['layer']: request = Request(method='DELETE', path=[resource, obj.guid]) else: request = Request(method='PUT', path=[resource, obj.guid], content=obj.properties(props)) new_stats.log(request) downloaded = {} for resource in ('context', 'artifact'): stats = old_stats.report( {resource: ['downloaded']}, left - step, right, 1) if not stats.get(resource): continue stats = stats[resource][-1][1].get('downloaded') if stats: downloaded[resource] = {'downloaded': stats} new_stats.commit(left + (right - left) / 2, downloaded) new_stats.commit_objects(True) shutil.rmtree(path) shutil.move(tmp_path, path) @contextmanager def load_bundle(volume, request, bundle_path): impl = request.copy() initial = False if 'initial' in impl: initial = impl.pop('initial') data = impl.setdefault('data', {}) contexts = volume['context'] context = impl.get('context') context_meta = None impls = volume['implementation'] try: bundle = Bundle(bundle_path, mime_type='application/zip') except Exception: _logger.debug('Load unrecognized bundle from %r', bundle_path) context_type = 'content' else: _logger.debug('Load Sugar Activity bundle from %r', bundle_path) context_type = 'activity' unpack_size = 0 with bundle: for arcname in bundle.get_names(): unpack_size += bundle.getmember(arcname).size spec = bundle.get_spec() context_meta = _load_context_metadata(bundle, spec) if 'requires' in impl: spec.requires.update(parse_requires(impl.pop('requires'))) context = impl['context'] = spec['context'] impl['version'] = spec['version'] impl['stability'] = spec['stability'] if spec['license'] is not EMPTY_LICENSE: impl['license'] = spec['license'] data['spec'] = {'*-*': { 'commands': spec.commands, 'requires': spec.requires, }} data['unpack_size'] = unpack_size data['mime_type'] = 'application/vnd.olpc-sugar' if initial and not contexts.exists(context): context_meta['guid'] = context context_meta['type'] = 'activity' request.call(method='POST', path=['context'], content=context_meta) context_meta = None enforce(context, 'Context is not specified') enforce('version' in impl, 'Version is not specified') enforce(context_type in contexts.get(context)['type'], http.BadRequest, 'Inappropriate bundle type') if 'license' not in impl: existing, total = impls.find( context=context, order_by='-version', not_layer='deleted') enforce(total, 'License is not specified') impl['license'] = next(existing)['license'] digest = hashlib.sha1() with file(bundle_path, 'rb') as f: while True: chunk = f.read(toolkit.BUFFER_SIZE) if not chunk: break digest.update(chunk) data['digest'] = digest.hexdigest() yield impl existing, __ = impls.find( context=context, version=impl['version'], not_layer='deleted') if 'url' not in data: data['blob'] = bundle_path impl['guid'] = \ request.call(method='POST', path=['implementation'], content=impl) for i in existing: layer = i['layer'] + ['deleted'] impls.update(i.guid, {'layer': layer}) if 'origin' in impls.get(impl['guid']).layer: diff = contexts.patch(context, context_meta) if diff: request.call(method='PUT', path=['context', context], content=diff) def _load_context_metadata(bundle, spec): def convert(data, w, h): result = toolkit.svg_to_png(data.getvalue(), w, h) return {'blob': result, 'mime_type': 'image/png', 'digest': hashlib.sha1(result.getvalue()).hexdigest(), } result = {} for prop in ('homepage', 'mime_types'): if spec[prop]: result[prop] = spec[prop] try: icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon'])) icon = StringIO(icon_file.read()) icon_file.close() result.update({ 'artifact_icon': { 'blob': icon, 'mime_type': 'image/svg+xml', 'digest': hashlib.sha1(icon.getvalue()).hexdigest(), }, 'preview': convert(icon, 160, 120), 'icon': convert(icon, 55, 55), }) except Exception: exception(_logger, 'Failed to load icon') msgids = {} for prop, confname in [ ('title', 'name'), ('summary', 'summary'), ('description', 'description'), ]: if spec[confname]: msgids[prop] = spec[confname] result[prop] = {'en': spec[confname]} with toolkit.mkdtemp() as tmpdir: for path in bundle.get_names(): if not path.endswith('.mo'): continue mo_path = path.strip(os.sep).split(os.sep) if len(mo_path) != 5 or mo_path[1] != 'locale': continue lang = mo_path[2] bundle.extract(path, tmpdir) try: i18n = gettext.translation(spec['context'], join(tmpdir, *mo_path[:2]), [lang]) for prop, value in msgids.items(): msgstr = i18n.gettext(value).decode('utf8') if lang == 'en' or msgstr != value: result[prop][lang] = msgstr except Exception: exception(_logger, 'Gettext failed to read %r', mo_path[-1]) return result