Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/node/routes.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/node/routes.py')
-rw-r--r--sugar_network/node/routes.py466
1 files changed, 58 insertions, 408 deletions
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index eb48c70..6323cbc 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,23 +15,21 @@
import os
import time
-import shutil
-import gettext
import logging
import hashlib
-from contextlib import contextmanager
from ConfigParser import ConfigParser
from os.path import join, isdir, exists
-from sugar_network import node, toolkit, model
-from sugar_network.node import stats_node, stats_user
-from sugar_network.model.context import Context
+from sugar_network import db, node, toolkit, model
+from sugar_network.db import files
+from sugar_network.node import stats_user
# pylint: disable-msg=W0611
from sugar_network.toolkit.router import route, preroute, postroute, ACL
from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute
-from sugar_network.toolkit.spec import EMPTY_LICENSE
from sugar_network.toolkit.spec import parse_requires, ensure_requires
+from sugar_network.toolkit.spec import parse_version
from sugar_network.toolkit.bundle import Bundle
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import pylru, http, coroutine, exception, enforce
@@ -41,28 +39,16 @@ _AUTH_POOL_SIZE = 1024
_logger = logging.getLogger('node.routes')
-class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
+class NodeRoutes(db.Routes, model.FrontRoutes):
- def __init__(self, guid, volume):
- model.VolumeRoutes.__init__(self, volume)
+ def __init__(self, guid, **kwargs):
+ db.Routes.__init__(self, **kwargs)
model.FrontRoutes.__init__(self)
- volume.broadcast = self.broadcast
-
self._guid = guid
- self._stats = None
self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE)
self._auth_config = None
self._auth_config_mtime = 0
- if stats_node.stats_node.value:
- stats_path = join(node.stats_root.value, 'node')
- self._stats = stats_node.Sniffer(volume, stats_path)
- coroutine.spawn(self._commit_stats)
-
- def close(self):
- if self._stats is not None:
- self._stats.suspend()
-
@property
def guid(self):
return self._guid
@@ -80,33 +66,12 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
@route('GET', cmd='status', mime_type='application/json')
def status(self):
- documents = {}
- for name, directory in self.volume.items():
- documents[name] = {'mtime': directory.mtime}
- return {'guid': self._guid, 'resources': documents}
-
- @route('GET', cmd='stats', arguments={
- 'start': int, 'end': int, 'records': int, 'source': list},
- mime_type='application/json')
- def stats(self, start, end, records, source):
- enforce(self._stats is not None, 'Node stats is disabled')
- if not source:
- return {}
-
- if records > _MAX_STAT_RECORDS:
- _logger.debug('Decrease %d stats records number to %d',
- records, _MAX_STAT_RECORDS)
- records = _MAX_STAT_RECORDS
- elif records <= 0:
- records = _MAX_STAT_RECORDS / 10
-
- stats = {}
- for i in source:
- enforce('.' in i, 'Misnamed source')
- db_name, ds_name = i.split('.', 1)
- stats.setdefault(db_name, []).append(ds_name)
-
- return self._stats.report(stats, start, end, records)
+ return {'guid': self._guid,
+ 'seqno': {
+ 'db': self.volume.seqno.value,
+ 'releases': self.volume.releases_seqno.value,
+ },
+ }
@route('POST', ['user'], mime_type='application/json')
def register(self, request):
@@ -149,23 +114,19 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
response.content_type = 'application/json'
return result
- @route('POST', ['release'], cmd='submit',
+ @route('POST', ['context'], cmd='submit',
arguments={'initial': False},
mime_type='application/json', acl=ACL.AUTH)
- def submit_release(self, request, document):
- with toolkit.NamedTemporaryFile() as blob:
- shutil.copyfileobj(request.content_stream, blob)
- blob.flush()
- with load_bundle(self.volume, request, blob.name) as impl:
- impl['data']['blob'] = blob.name
- return impl['guid']
-
- @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
- def delete(self, request):
- # Servers data should not be deleted immediately
- # to let master-slave synchronization possible
- request.call(method='PUT', path=request.path,
- content={'layer': ['deleted']})
+ def submit_release(self, request, initial):
+ blob = files.post(request.content_stream)
+ try:
+ context, release = model.load_bundle(blob, initial=initial)
+ except Exception:
+ files.delete(blob.digest)
+ raise
+ this.call(method='POST', path=['context', context, 'releases'],
+ content_type='application/json', content=release)
+ return blob.digest
@route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER)
def attach(self, request):
@@ -186,43 +147,37 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
@route('GET', ['context', None], cmd='clone',
arguments={'requires': list})
def get_clone(self, request, response):
- return self._get_clone(request, response)
+ deps = {}
+ if 'requires' in request:
+ for i in request['requires']:
+ deps.update(parse_requires(i))
+ version = request.get('version')
+ if version:
+ version = parse_version(version)[0]
+ stability = request.get('stability') or 'stable'
+
+ recent = None
+ context = self.volume['context'][request.guid]
+ for release in context['releases'].values():
+ release = release.get('value')
+ if not release:
+ continue
+ spec = release['spec']['*-*']
+ if version and version != release['release'][0] or \
+ stability and stability != release['stability'] or \
+ deps and not ensure_requires(spec['requires'], deps):
+ continue
+ if recent is None or release['release'] > recent['release']:
+ recent = release
+ enforce(recent, http.NotFound, 'No releases found')
+
+ response.meta = recent
+ return files.get(recent['spec']['*-*']['bundle'])
@route('HEAD', ['context', None], cmd='clone',
arguments={'requires': list})
def head_clone(self, request, response):
- self._get_clone(request, response)
-
- @route('GET', ['context', None], cmd='deplist',
- mime_type='application/json', arguments={'requires': list})
- def deplist(self, request, repo):
- """List of native packages context is dependening on.
-
- Command return only GNU/Linux package names and ignores
- Sugar Network dependencies.
-
- :param repo:
- OBS repository name to get package names for, e.g.,
- Fedora-14
- :returns:
- list of package names
-
- """
- enforce(repo, 'Argument %r should be set', 'repo')
-
- spec = self._solve(request).meta('data')['spec']['*-*']
- common_deps = self.volume['context'].get(request.guid)['dependencies']
- result = []
-
- for package in set(spec.get('requires') or []) | set(common_deps):
- if package == 'sugar':
- continue
- dep = self.volume['context'].get(package)
- enforce(repo in dep['packages'],
- 'No packages for %r on %r', package, repo)
- result.extend(dep['packages'][repo].get('binary') or [])
-
- return result
+ self.get_clone(request, response)
@route('GET', ['user', None], cmd='stats-info',
mime_type='application/json', acl=ACL.AUTH)
@@ -246,15 +201,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
for timestamp, values in values:
rrd[name].put(values, timestamp)
- @route('GET', ['report', None], cmd='log', mime_type='text/html')
- def log(self, request):
- # In further implementations, `data` might be a tarball
- data = self.volume[request.resource].get(request.guid).meta('data')
- if data and 'blob' in data:
- return file(data['blob'], 'rb')
- else:
- return ''
-
@preroute
def preroute(self, op, request, response):
if op.acl & ACL.AUTH and request.principal is None:
@@ -277,22 +223,11 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
enforce(self.authorize(request.principal, 'root'), http.Forbidden,
'Operation is permitted only for superusers')
- @postroute
- def postroute(self, request, response, result, error):
- if error is None or isinstance(error, http.StatusPass):
- if self._stats is not None:
- self._stats.log(request)
-
- def on_create(self, request, props, event):
+ def on_create(self, request, props):
if request.resource == 'user':
- with file(props['pubkey']['blob']) as f:
+ with file(files.get(props['pubkey']).path) as f:
props['guid'] = str(hashlib.sha1(f.read()).hexdigest())
- model.VolumeRoutes.on_create(self, request, props, event)
-
- def on_update(self, request, props, event):
- model.VolumeRoutes.on_update(self, request, props, event)
- if 'deleted' in props.get('layer', []):
- event['event'] = 'delete'
+ db.Routes.on_create(self, request, props)
def on_aggprop_update(self, request, prop, value):
if prop.acl & ACL.AUTHOR:
@@ -300,27 +235,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
elif value is not None:
self._enforce_authority(request, value.get('author'))
- def find(self, request, reply):
- limit = request.get('limit')
- if limit is None or limit < 0:
- request['limit'] = node.find_limit.value
- elif limit > node.find_limit.value:
- _logger.warning('The find limit is restricted to %s',
- node.find_limit.value)
- request['limit'] = node.find_limit.value
- layer = request.setdefault('layer', [])
- if 'deleted' in layer:
- _logger.warning('Requesting "deleted" layer')
- layer.remove('deleted')
- request.add('not_layer', 'deleted')
- return model.VolumeRoutes.find(self, request, reply)
-
- def get(self, request, reply):
- doc = self.volume[request.resource].get(request.guid)
- enforce('deleted' not in doc['layer'], http.NotFound,
- 'Resource deleted')
- return model.VolumeRoutes.get(self, request, reply)
-
def authenticate(self, auth):
enforce(auth.scheme == 'sugar', http.BadRequest,
'Unknown authentication scheme')
@@ -329,8 +243,9 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
from M2Crypto import RSA
+ pubkey = self.volume['user'][auth.login]['pubkey']
+ key = RSA.load_pub_key(files.get(pubkey).path)
data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest()
- key = RSA.load_pub_key(self.volume['user'].path(auth.login, 'pubkey'))
enforce(key.verify(data, auth.signature.decode('hex')),
http.Forbidden, 'Bad credentials')
@@ -356,52 +271,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
return self._auth_config.get(user, role).strip().lower() in \
('true', 'on', '1', 'allow')
- def _commit_stats(self):
- while True:
- coroutine.sleep(stats_node.stats_node_step.value)
- self._stats.commit()
-
- def _solve(self, request):
- requires = {}
- if 'requires' in request:
- for i in request['requires']:
- requires.update(parse_requires(i))
- request.pop('requires')
- else:
- request['limit'] = 1
-
- if 'stability' not in request:
- request['stability'] = 'stable'
-
- impls, __ = self.volume['release'].find(
- context=request.guid, order_by='-version', not_layer='deleted',
- **request)
- impl = None
- for impl in impls:
- if requires:
- impl_deps = impl.meta('data')['spec']['*-*']['requires']
- if not ensure_requires(impl_deps, requires):
- continue
- break
- else:
- raise http.NotFound('No releases found')
- return impl
-
- def _get_clone(self, request, response):
- impl = self._solve(request)
- result = request.call(method=request.method,
- path=['release', impl['guid'], 'data'],
- response=response)
- response.meta = impl.properties([
- 'guid', 'ctime', 'layer', 'author', 'tags',
- 'context', 'version', 'stability', 'license', 'notes',
- ])
- response.meta['data'] = data = impl.meta('data')
- for key in ('mtime', 'seqno', 'blob'):
- if key in data:
- del data[key]
- return result
-
def _enforce_authority(self, request, author=None):
if request.resource == 'user':
allowed = (request.principal == request.guid)
@@ -412,222 +281,3 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
allowed = request.principal in author
enforce(allowed or self.authorize(request.principal, 'root'),
http.Forbidden, 'Operation is permitted only for authors')
-
-
-def generate_node_stats(volume, path):
- tmp_path = toolkit.mkdtemp()
- new_stats = stats_node.Sniffer(volume, tmp_path, True)
- old_stats = stats_node.Sniffer(volume, path)
-
- def timeline(ts):
- ts = long(ts)
- end = long(time.time())
- step = None
-
- archives = {}
- for rra in stats_node.stats_node_rras.value:
- a_step, a_size = [long(i) for i in rra.split(':')[-2:]]
- a_step *= stats_node.stats_node_step.value
- a_start = end - min(end, a_step * a_size)
- if archives.setdefault(a_start, a_step) > a_step:
- archives[a_start] = a_step
- archives = list(sorted(archives.items()))
-
- try:
- while ts <= end:
- while not step or archives and ts >= archives[0][0]:
- archive_start, step = archives.pop(0)
- ts = max(ts / step * step, archive_start)
- yield ts, ts + step - 1, step
- ts += step
- except GeneratorExit:
- shutil.rmtree(tmp_path, ignore_errors=True)
-
- start = next(volume['context'].find(limit=1, order_by='ctime')[0])['ctime']
- for left, right, step in timeline(start):
- for resource, props in [
- ('user', []),
- ('context', []),
- ('release', ['context']),
- ('report', ['context', 'release']),
- ('post', ['context', 'topic', 'type', 'vote']),
- ]:
- objs, __ = volume[resource].find(
- query='ctime:%s..%s' % (left, right))
- for obj in objs:
- request = Request(method='POST', path=[resource],
- content=obj.properties(props))
- new_stats.log(request)
- for resource, props in [
- ('user', ['layer']),
- ('context', ['layer']),
- ('release', ['layer']),
- ('report', ['layer']),
- ('post', ['layer']),
- ]:
- objs, __ = volume[resource].find(
- query='mtime:%s..%s' % (left, right))
- for obj in objs:
- if 'deleted' in obj['layer']:
- request = Request(method='DELETE',
- path=[resource, obj.guid])
- else:
- request = Request(method='PUT', path=[resource, obj.guid],
- content=obj.properties(props))
- new_stats.log(request)
- downloaded = {}
- for resource in ('context', 'post'):
- stats = old_stats.report(
- {resource: ['downloaded']}, left - step, right, 1)
- if not stats.get(resource):
- continue
- stats = stats[resource][-1][1].get('downloaded')
- if stats:
- downloaded[resource] = {'downloaded': stats}
- new_stats.commit(left + (right - left) / 2, downloaded)
-
- new_stats.commit_objects(True)
- shutil.rmtree(path)
- shutil.move(tmp_path, path)
-
-
-@contextmanager
-def load_bundle(volume, request, bundle_path):
- impl = request.copy()
- initial = False
- if 'initial' in impl:
- initial = impl.pop('initial')
- data = impl.setdefault('data', {})
- contexts = volume['context']
- context = impl.get('context')
- context_meta = None
- impls = volume['release']
-
- try:
- bundle = Bundle(bundle_path, mime_type='application/zip')
- except Exception:
- _logger.debug('Load unrecognized bundle from %r', bundle_path)
- context_type = 'book'
- else:
- _logger.debug('Load Sugar Activity bundle from %r', bundle_path)
- context_type = 'activity'
- unpack_size = 0
-
- with bundle:
- changelog = join(bundle.rootdir, 'CHANGELOG')
- for arcname in bundle.get_names():
- if changelog and arcname == changelog:
- with bundle.extractfile(changelog) as f:
- impl['notes'] = f.read()
- changelog = None
- unpack_size += bundle.getmember(arcname).size
- spec = bundle.get_spec()
- context_meta = _load_context_metadata(bundle, spec)
- if 'requires' in impl:
- spec.requires.update(parse_requires(impl.pop('requires')))
-
- context = impl['context'] = spec['context']
- impl['version'] = spec['version']
- impl['stability'] = spec['stability']
- if spec['license'] is not EMPTY_LICENSE:
- impl['license'] = spec['license']
- requires = impl['requires'] = []
- for dep_name, dep in spec.requires.items():
- found = False
- for version in dep.versions_range():
- requires.append('%s-%s' % (dep_name, version))
- found = True
- if not found:
- requires.append(dep_name)
-
- data['spec'] = {'*-*': {
- 'commands': spec.commands,
- 'requires': spec.requires,
- }}
- data['unpack_size'] = unpack_size
- data['mime_type'] = 'application/vnd.olpc-sugar'
-
- if initial and not contexts.exists(context):
- context_meta['type'] = 'activity'
- request.call(method='POST', path=['context'], content=context_meta)
- context_meta = None
-
- enforce(context, 'Context is not specified')
- enforce('version' in impl, 'Version is not specified')
- enforce(context_type in contexts.get(context)['type'],
- http.BadRequest, 'Inappropriate bundle type')
- if 'license' not in impl:
- existing, total = impls.find(
- context=context, order_by='-version', not_layer='deleted')
- enforce(total, 'License is not specified')
- impl['license'] = next(existing)['license']
-
- digest = hashlib.sha1()
- with file(bundle_path, 'rb') as f:
- while True:
- chunk = f.read(toolkit.BUFFER_SIZE)
- if not chunk:
- break
- digest.update(chunk)
- data['digest'] = digest.hexdigest()
-
- yield impl
-
- existing, __ = impls.find(
- context=context, version=impl['version'], not_layer='deleted')
- if 'url' not in data:
- data['blob'] = bundle_path
- impl['guid'] = request.call(method='POST', path=['release'], content=impl)
- for i in existing:
- layer = i['layer'] + ['deleted']
- impls.update(i.guid, {'layer': layer})
-
- if 'origin' in impls.get(impl['guid']).layer:
- diff = contexts.patch(context, context_meta)
- if diff:
- request.call(method='PUT', path=['context', context], content=diff)
-
-
-def _load_context_metadata(bundle, spec):
- result = {}
- for prop in ('homepage', 'mime_types'):
- if spec[prop]:
- result[prop] = spec[prop]
- result['guid'] = spec['context']
-
- try:
- icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon']))
- Context.populate_images(result, icon_file.read())
- icon_file.close()
- except Exception:
- exception(_logger, 'Failed to load icon')
-
- msgids = {}
- for prop, confname in [
- ('title', 'name'),
- ('summary', 'summary'),
- ('description', 'description'),
- ]:
- if spec[confname]:
- msgids[prop] = spec[confname]
- result[prop] = {'en': spec[confname]}
- with toolkit.mkdtemp() as tmpdir:
- for path in bundle.get_names():
- if not path.endswith('.mo'):
- continue
- mo_path = path.strip(os.sep).split(os.sep)
- if len(mo_path) != 5 or mo_path[1] != 'locale':
- continue
- lang = mo_path[2]
- bundle.extract(path, tmpdir)
- try:
- i18n = gettext.translation(spec['context'],
- join(tmpdir, *mo_path[:2]), [lang])
- for prop, value in msgids.items():
- msgstr = i18n.gettext(value).decode('utf8')
- if lang == 'en' or msgstr != value:
- result[prop][lang] = msgstr
- except Exception:
- exception(_logger, 'Gettext failed to read %r', mo_path[-1])
-
- return result