diff options
Diffstat (limited to 'sugar_network/node')
-rw-r--r-- | sugar_network/node/master.py | 73 | ||||
-rw-r--r-- | sugar_network/node/model.py | 177 | ||||
-rw-r--r-- | sugar_network/node/obs.py | 116 | ||||
-rw-r--r-- | sugar_network/node/routes.py | 466 | ||||
-rw-r--r-- | sugar_network/node/slave.py | 13 | ||||
-rw-r--r-- | sugar_network/node/stats_node.py | 311 | ||||
-rw-r--r-- | sugar_network/node/sync.py | 2 | ||||
-rw-r--r-- | sugar_network/node/volume.py | 142 |
8 files changed, 310 insertions, 990 deletions
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index 19a8cf1..c7c22e0 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -20,12 +20,19 @@ from Cookie import SimpleCookie from os.path import join from sugar_network import node, toolkit -from sugar_network.node import sync, stats_user, files, volume, downloads, obs +from sugar_network.node import sync, stats_user, files, model, downloads, obs from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL -from sugar_network.toolkit import http, coroutine, enforce +from sugar_network.toolkit import http, enforce +RESOURCES = ( + 'sugar_network.node.model', + 'sugar_network.model.post', + 'sugar_network.model.report', + 'sugar_network.model.user', + ) + _ONE_WAY_DOCUMENTS = ['report'] _logger = logging.getLogger('node.master') @@ -33,12 +40,12 @@ _logger = logging.getLogger('node.master') class MasterRoutes(NodeRoutes): - def __init__(self, guid, volume_): - NodeRoutes.__init__(self, guid, volume_) + def __init__(self, guid, volume, **kwargs): + NodeRoutes.__init__(self, guid, volume=volume, **kwargs) self._pulls = { 'pull': lambda **kwargs: - ('diff', None, volume.diff(self.volume, + ('diff', None, model.diff(self.volume, ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)), 'files_pull': lambda **kwargs: ('files_diff', None, self._files.diff(**kwargs)), @@ -50,7 +57,7 @@ class MasterRoutes(NodeRoutes): if node.files_root.value: self._files = files.Index(node.files_root.value, - join(volume_.root, 'files.index'), volume_.seqno) + join(volume.root, 'files.index'), volume.seqno) @route('POST', cmd='sync', acl=ACL.AUTH) @@ -137,25 +144,13 @@ class MasterRoutes(NodeRoutes): enforce(node.files_root.value, http.BadRequest, 'Disabled') aliases = self.volume['context'].get(request.guid)['aliases'] enforce(aliases, http.BadRequest, 'Nothing to presolve') - return obs.presolve(aliases, node.files_root.value) + return obs.presolve(None, aliases, node.files_root.value) def status(self): result = NodeRoutes.status(self) result['level'] = 'master' return result - def after_post(self, doc): - if doc.metadata.name == 'context': - shift_releases = doc.modified('dependencies') - if doc.modified('aliases'): - # TODO Already launched job should be killed - coroutine.spawn(self._resolve_aliases, doc) - shift_releases = True - if shift_releases and not doc.is_new: - # Shift checkpoint to invalidate solutions - self.volume['release'].checkpoint() - NodeRoutes.after_post(self, doc) - def _push(self, stream): reply = [] cookie = _Cookie() @@ -172,8 +167,7 @@ class MasterRoutes(NodeRoutes): if self._files is not None: cookie['files_pull'].include(packet['sequence']) elif packet.name == 'diff': - seq, ack_seq = volume.merge(self.volume, packet, - stats=self._stats) + seq, ack_seq = model.merge(self.volume, packet) reply.append(('ack', { 'ack': ack_seq, 'sequence': seq, @@ -189,43 +183,6 @@ class MasterRoutes(NodeRoutes): return reply, cookie - def _resolve_aliases(self, doc): - packages = {} - for repo in obs.get_repos(): - alias = doc['aliases'].get(repo['distributor_id']) - if not alias: - continue - package = packages[repo['name']] = {} - for kind in ('binary', 'devel'): - obs_fails = [] - for to_resolve in alias.get(kind) or []: - if not to_resolve: - continue - try: - for arch in repo['arches']: - obs.resolve(repo['name'], arch, to_resolve) - except Exception, error: - _logger.warning('Failed to resolve %r on %s', - to_resolve, repo['name']) - obs_fails.append(str(error)) - continue - package[kind] = to_resolve - break - else: - package['status'] = '; '.join(obs_fails) - break - else: - if 'binary' in package: - package['status'] = 'success' - else: - package['status'] = 'no packages to resolve' - - if packages != doc['packages']: - self.volume['context'].update(doc.guid, {'packages': packages}) - - if node.files_root.value: - obs.presolve(doc['aliases'], node.files_root.value) - class _Cookie(list): diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py new file mode 100644 index 0000000..2681b2d --- /dev/null +++ b/sugar_network/node/model.py @@ -0,0 +1,177 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging + +from sugar_network import db, toolkit +from sugar_network.model import Release, context +from sugar_network.node import obs +from sugar_network.toolkit.router import ACL +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import http, coroutine, enforce + + +_logger = logging.getLogger('node.model') +_presolve_queue = None + + +class _Release(Release): + + _package_cast = db.Dict(db.List()) + + def typecast(self, value): + if not this.resource.exists or 'package' not in this.resource['type']: + return Release.typecast(self, value) + + value = self._package_cast.typecast(value) + enforce(value.get('binary'), http.BadRequest, 'No binary aliases') + + distro = this.request.key + if distro == '*': + lsb_id = None + lsb_release = None + elif '-' in this.request.key: + lsb_id, lsb_release = distro.split('-', 1) + else: + lsb_id = distro + lsb_release = None + releases = this.resource.record.get('releases') + statuses = releases['value'].setdefault('status', {}) + to_presolve = [] + + for repo in obs.get_repos(): + if lsb_id and lsb_id != repo['lsb_id'] or \ + lsb_release and lsb_release != repo['lsb_release']: + continue + # Make sure there are no alias overrides + if not lsb_id and repo['lsb_id'] in releases['value'] or \ + not lsb_release and repo['name'] in releases['value']: + continue + pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], []) + try: + for arch in repo['arches']: + obs.resolve(repo['name'], arch, pkgs) + except Exception, error: + _logger.warning('Failed to resolve %r on %s', + pkgs, repo['name']) + status = str(error) + else: + to_presolve.append((repo['name'], pkgs)) + status = 'success' + statuses[repo['name']] = status + + if to_presolve and _presolve_queue is not None: + _presolve_queue.put(to_presolve) + if statuses: + this.resource.record.set('releases', **releases) + + return value + + def teardown(self, value): + if 'package' not in this.resource['type']: + return Release.typecast(self, value) + # TODO Delete presolved files + + +class Context(context.Context): + + @db.stored_property(db.Aggregated, subtype=_Release(), + acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE) + def releases(self, value): + return value + + @releases.setter + def releases(self, value): + if value or this.request.method != 'POST': + self.invalidate_solutions() + return value + + +def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, + ignore_documents=None, **kwargs): + if out_seq is None: + out_seq = toolkit.Sequence([]) + is_the_only_seq = not out_seq + if layer: + if isinstance(layer, basestring): + layer = [layer] + layer.append('common') + try: + for resource, directory in volume.items(): + if ignore_documents and resource in ignore_documents: + continue + coroutine.dispatch() + directory.commit() + yield {'resource': resource} + for guid, patch in directory.diff(in_seq, exclude_seq, + layer=layer if resource == 'context' else None): + adiff = {} + adiff_seq = toolkit.Sequence() + for prop, meta, seqno in patch: + adiff[prop] = meta + adiff_seq.include(seqno, seqno) + if adiff: + yield {'guid': guid, 'diff': adiff} + out_seq.include(adiff_seq) + if is_the_only_seq: + # There is only one diff, so, we can stretch it to remove all holes + out_seq.stretch() + except StopIteration: + pass + + yield {'commit': out_seq} + + +def merge(volume, records): + directory = None + commit_seq = toolkit.Sequence() + merged_seq = toolkit.Sequence() + synced = False + + for record in records: + resource_ = record.get('resource') + if resource_: + resource = resource_ + directory = volume[resource_] + continue + + if 'guid' in record: + guid = record['guid'] + existed = directory.exists(guid) + if existed: + layer = directory.get(guid)['layer'] + seqno, merged = directory.merge(**record) + synced = synced or merged + if seqno is not None: + merged_seq.include(seqno, seqno) + continue + + commit = record.get('commit') + if commit is not None: + commit_seq.include(commit) + continue + + if synced: + this.broadcast({'event': 'sync'}) + + return commit_seq, merged_seq + + +def presolve(presolve_path): + global _presolve_queue + _presolve_queue = coroutine.Queue() + + for repo_name, pkgs in _presolve_queue: + obs.presolve(repo_name, pkgs, presolve_path) diff --git a/sugar_network/node/obs.py b/sugar_network/node/obs.py index 1d8a547..6ef9e55 100644 --- a/sugar_network/node/obs.py +++ b/sugar_network/node/obs.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -37,7 +37,7 @@ obs_presolve_project = Option( default='presolve') _logger = logging.getLogger('node.obs') -_client = None +_conn = None _repos = {} @@ -45,82 +45,68 @@ def get_repos(): return _get_repos(obs_project.value) -def resolve(repo, arch, names): - for package in names: - _request('GET', ['resolve'], params={ - 'project': obs_project.value, - 'repository': repo, - 'arch': arch, - 'package': package, - }) +def resolve(repo, arch, packages): + _request('GET', ['resolve'], params={ + 'project': obs_project.value, + 'repository': repo, + 'arch': arch, + 'package': packages, + }) -def presolve(aliases, dst_path): +def presolve(repo_name, packages, dst_path): for repo in _get_repos(obs_presolve_project.value): - # Presolves make sense only for XO, thus, for Fedora - alias = aliases.get('Fedora') - if not alias: - continue - - name_variants = alias['binary'] - while name_variants: - names = name_variants.pop() - presolves = [] + dst_dir = join(dst_path, 'packages', + obs_presolve_project.value, repo['name']) + result = {} + to_download = [] + + for package in packages: + files = result.setdefault(package, {}) try: - for arch in repo['arches']: - for package in names: - response = _request('GET', ['resolve'], params={ - 'project': obs_presolve_project.value, - 'repository': repo['name'], - 'arch': arch, - 'package': package, - 'withdeps': '1', - 'exclude': 'sweets-sugar', - }) - binaries = [] - for pkg in response.findall('binary'): - binaries.append(dict(pkg.items())) - presolves.append((package, binaries)) + for repo_arch in repo['arches']: + response = _request('GET', ['resolve'], params={ + 'project': obs_presolve_project.value, + 'repository': '%(lsb_id)s-%(lsb_release)s' % repo, + 'arch': repo_arch, + 'package': package, + 'withdeps': '1', + 'exclude': 'sweets-sugar', + }) + for binary in response.findall('binary'): + binary = dict(binary.items()) + arch = binary.pop('arch') + url = binary.pop('url') + filename = binary['path'] = basename(url) + path = join(dst_dir, filename) + if not exists(path): + to_download.append((url, path)) + files.setdefault(arch, []).append(binary) except Exception: toolkit.exception(_logger, 'Failed to presolve %r on %s', - names, repo['name']) + packages, repo['name']) continue - _logger.debug('Presolve %r on %s', names, repo['name']) - - dst_dir = join(dst_path, 'packages', - obs_presolve_project.value, repo['name']) - if not exists(dst_dir): - os.makedirs(dst_dir) - result = {} - - for package, binaries in presolves: - files = [] - for binary in binaries: - arch = binary.pop('arch') - if not files: - result.setdefault(package, {})[arch] = files - url = binary.pop('url') - filename = binary['path'] = basename(url) - path = join(dst_dir, filename) - if not exists(path): - _client.download(url, path) - files.append(binary) + _logger.debug('Presolve %r on %s', packages, repo['name']) - for package, info in result.items(): - with toolkit.new_file(join(dst_dir, package)) as f: - json.dump(info, f) + if not exists(dst_dir): + os.makedirs(dst_dir) + for url, path in to_download: + _conn.download(url, path) + for package, info in result.items(): + with toolkit.new_file(join(dst_dir, package)) as f: + json.dump(info, f) - return {'repo': repo['name'], 'packages': result} + return {'repo': repo['name'], 'packages': result} def _request(*args, **kwargs): - global _client + global _conn - if _client is None: - _client = http.Connection(obs_url.value) + if _conn is None: + _conn = http.Connection(obs_url.value) - response = _client.request(*args, allowed=(400, 404), **kwargs) + response = _conn.request(*args, allowed=(400, 404), **kwargs) enforce(response.headers.get('Content-Type') == 'text/xml', 'Irregular OBS response') reply = ElementTree.fromstring(response.content) @@ -144,8 +130,10 @@ def _get_repos(project): for repo in _request('GET', ['build', project]).findall('entry'): repo = repo.get('name') arches = _request('GET', ['build', project, repo]) + lsb_id, lsb_release = repo.split('-', 1) repos.append({ - 'distributor_id': repo.split('-', 1)[0], + 'lsb_id': lsb_id, + 'lsb_release': lsb_release, 'name': repo, 'arches': [i.get('name') for i in arches.findall('entry')], }) diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index eb48c70..6323cbc 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,23 +15,21 @@ import os import time -import shutil -import gettext import logging import hashlib -from contextlib import contextmanager from ConfigParser import ConfigParser from os.path import join, isdir, exists -from sugar_network import node, toolkit, model -from sugar_network.node import stats_node, stats_user -from sugar_network.model.context import Context +from sugar_network import db, node, toolkit, model +from sugar_network.db import files +from sugar_network.node import stats_user # pylint: disable-msg=W0611 from sugar_network.toolkit.router import route, preroute, postroute, ACL from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute -from sugar_network.toolkit.spec import EMPTY_LICENSE from sugar_network.toolkit.spec import parse_requires, ensure_requires +from sugar_network.toolkit.spec import parse_version from sugar_network.toolkit.bundle import Bundle +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import pylru, http, coroutine, exception, enforce @@ -41,28 +39,16 @@ _AUTH_POOL_SIZE = 1024 _logger = logging.getLogger('node.routes') -class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): +class NodeRoutes(db.Routes, model.FrontRoutes): - def __init__(self, guid, volume): - model.VolumeRoutes.__init__(self, volume) + def __init__(self, guid, **kwargs): + db.Routes.__init__(self, **kwargs) model.FrontRoutes.__init__(self) - volume.broadcast = self.broadcast - self._guid = guid - self._stats = None self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE) self._auth_config = None self._auth_config_mtime = 0 - if stats_node.stats_node.value: - stats_path = join(node.stats_root.value, 'node') - self._stats = stats_node.Sniffer(volume, stats_path) - coroutine.spawn(self._commit_stats) - - def close(self): - if self._stats is not None: - self._stats.suspend() - @property def guid(self): return self._guid @@ -80,33 +66,12 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): @route('GET', cmd='status', mime_type='application/json') def status(self): - documents = {} - for name, directory in self.volume.items(): - documents[name] = {'mtime': directory.mtime} - return {'guid': self._guid, 'resources': documents} - - @route('GET', cmd='stats', arguments={ - 'start': int, 'end': int, 'records': int, 'source': list}, - mime_type='application/json') - def stats(self, start, end, records, source): - enforce(self._stats is not None, 'Node stats is disabled') - if not source: - return {} - - if records > _MAX_STAT_RECORDS: - _logger.debug('Decrease %d stats records number to %d', - records, _MAX_STAT_RECORDS) - records = _MAX_STAT_RECORDS - elif records <= 0: - records = _MAX_STAT_RECORDS / 10 - - stats = {} - for i in source: - enforce('.' in i, 'Misnamed source') - db_name, ds_name = i.split('.', 1) - stats.setdefault(db_name, []).append(ds_name) - - return self._stats.report(stats, start, end, records) + return {'guid': self._guid, + 'seqno': { + 'db': self.volume.seqno.value, + 'releases': self.volume.releases_seqno.value, + }, + } @route('POST', ['user'], mime_type='application/json') def register(self, request): @@ -149,23 +114,19 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): response.content_type = 'application/json' return result - @route('POST', ['release'], cmd='submit', + @route('POST', ['context'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) - def submit_release(self, request, document): - with toolkit.NamedTemporaryFile() as blob: - shutil.copyfileobj(request.content_stream, blob) - blob.flush() - with load_bundle(self.volume, request, blob.name) as impl: - impl['data']['blob'] = blob.name - return impl['guid'] - - @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) - def delete(self, request): - # Servers data should not be deleted immediately - # to let master-slave synchronization possible - request.call(method='PUT', path=request.path, - content={'layer': ['deleted']}) + def submit_release(self, request, initial): + blob = files.post(request.content_stream) + try: + context, release = model.load_bundle(blob, initial=initial) + except Exception: + files.delete(blob.digest) + raise + this.call(method='POST', path=['context', context, 'releases'], + content_type='application/json', content=release) + return blob.digest @route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER) def attach(self, request): @@ -186,43 +147,37 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): @route('GET', ['context', None], cmd='clone', arguments={'requires': list}) def get_clone(self, request, response): - return self._get_clone(request, response) + deps = {} + if 'requires' in request: + for i in request['requires']: + deps.update(parse_requires(i)) + version = request.get('version') + if version: + version = parse_version(version)[0] + stability = request.get('stability') or 'stable' + + recent = None + context = self.volume['context'][request.guid] + for release in context['releases'].values(): + release = release.get('value') + if not release: + continue + spec = release['spec']['*-*'] + if version and version != release['release'][0] or \ + stability and stability != release['stability'] or \ + deps and not ensure_requires(spec['requires'], deps): + continue + if recent is None or release['release'] > recent['release']: + recent = release + enforce(recent, http.NotFound, 'No releases found') + + response.meta = recent + return files.get(recent['spec']['*-*']['bundle']) @route('HEAD', ['context', None], cmd='clone', arguments={'requires': list}) def head_clone(self, request, response): - self._get_clone(request, response) - - @route('GET', ['context', None], cmd='deplist', - mime_type='application/json', arguments={'requires': list}) - def deplist(self, request, repo): - """List of native packages context is dependening on. - - Command return only GNU/Linux package names and ignores - Sugar Network dependencies. - - :param repo: - OBS repository name to get package names for, e.g., - Fedora-14 - :returns: - list of package names - - """ - enforce(repo, 'Argument %r should be set', 'repo') - - spec = self._solve(request).meta('data')['spec']['*-*'] - common_deps = self.volume['context'].get(request.guid)['dependencies'] - result = [] - - for package in set(spec.get('requires') or []) | set(common_deps): - if package == 'sugar': - continue - dep = self.volume['context'].get(package) - enforce(repo in dep['packages'], - 'No packages for %r on %r', package, repo) - result.extend(dep['packages'][repo].get('binary') or []) - - return result + self.get_clone(request, response) @route('GET', ['user', None], cmd='stats-info', mime_type='application/json', acl=ACL.AUTH) @@ -246,15 +201,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): for timestamp, values in values: rrd[name].put(values, timestamp) - @route('GET', ['report', None], cmd='log', mime_type='text/html') - def log(self, request): - # In further implementations, `data` might be a tarball - data = self.volume[request.resource].get(request.guid).meta('data') - if data and 'blob' in data: - return file(data['blob'], 'rb') - else: - return '' - @preroute def preroute(self, op, request, response): if op.acl & ACL.AUTH and request.principal is None: @@ -277,22 +223,11 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): enforce(self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for superusers') - @postroute - def postroute(self, request, response, result, error): - if error is None or isinstance(error, http.StatusPass): - if self._stats is not None: - self._stats.log(request) - - def on_create(self, request, props, event): + def on_create(self, request, props): if request.resource == 'user': - with file(props['pubkey']['blob']) as f: + with file(files.get(props['pubkey']).path) as f: props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) - model.VolumeRoutes.on_create(self, request, props, event) - - def on_update(self, request, props, event): - model.VolumeRoutes.on_update(self, request, props, event) - if 'deleted' in props.get('layer', []): - event['event'] = 'delete' + db.Routes.on_create(self, request, props) def on_aggprop_update(self, request, prop, value): if prop.acl & ACL.AUTHOR: @@ -300,27 +235,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): elif value is not None: self._enforce_authority(request, value.get('author')) - def find(self, request, reply): - limit = request.get('limit') - if limit is None or limit < 0: - request['limit'] = node.find_limit.value - elif limit > node.find_limit.value: - _logger.warning('The find limit is restricted to %s', - node.find_limit.value) - request['limit'] = node.find_limit.value - layer = request.setdefault('layer', []) - if 'deleted' in layer: - _logger.warning('Requesting "deleted" layer') - layer.remove('deleted') - request.add('not_layer', 'deleted') - return model.VolumeRoutes.find(self, request, reply) - - def get(self, request, reply): - doc = self.volume[request.resource].get(request.guid) - enforce('deleted' not in doc['layer'], http.NotFound, - 'Resource deleted') - return model.VolumeRoutes.get(self, request, reply) - def authenticate(self, auth): enforce(auth.scheme == 'sugar', http.BadRequest, 'Unknown authentication scheme') @@ -329,8 +243,9 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): from M2Crypto import RSA + pubkey = self.volume['user'][auth.login]['pubkey'] + key = RSA.load_pub_key(files.get(pubkey).path) data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() - key = RSA.load_pub_key(self.volume['user'].path(auth.login, 'pubkey')) enforce(key.verify(data, auth.signature.decode('hex')), http.Forbidden, 'Bad credentials') @@ -356,52 +271,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): return self._auth_config.get(user, role).strip().lower() in \ ('true', 'on', '1', 'allow') - def _commit_stats(self): - while True: - coroutine.sleep(stats_node.stats_node_step.value) - self._stats.commit() - - def _solve(self, request): - requires = {} - if 'requires' in request: - for i in request['requires']: - requires.update(parse_requires(i)) - request.pop('requires') - else: - request['limit'] = 1 - - if 'stability' not in request: - request['stability'] = 'stable' - - impls, __ = self.volume['release'].find( - context=request.guid, order_by='-version', not_layer='deleted', - **request) - impl = None - for impl in impls: - if requires: - impl_deps = impl.meta('data')['spec']['*-*']['requires'] - if not ensure_requires(impl_deps, requires): - continue - break - else: - raise http.NotFound('No releases found') - return impl - - def _get_clone(self, request, response): - impl = self._solve(request) - result = request.call(method=request.method, - path=['release', impl['guid'], 'data'], - response=response) - response.meta = impl.properties([ - 'guid', 'ctime', 'layer', 'author', 'tags', - 'context', 'version', 'stability', 'license', 'notes', - ]) - response.meta['data'] = data = impl.meta('data') - for key in ('mtime', 'seqno', 'blob'): - if key in data: - del data[key] - return result - def _enforce_authority(self, request, author=None): if request.resource == 'user': allowed = (request.principal == request.guid) @@ -412,222 +281,3 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes): allowed = request.principal in author enforce(allowed or self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for authors') - - -def generate_node_stats(volume, path): - tmp_path = toolkit.mkdtemp() - new_stats = stats_node.Sniffer(volume, tmp_path, True) - old_stats = stats_node.Sniffer(volume, path) - - def timeline(ts): - ts = long(ts) - end = long(time.time()) - step = None - - archives = {} - for rra in stats_node.stats_node_rras.value: - a_step, a_size = [long(i) for i in rra.split(':')[-2:]] - a_step *= stats_node.stats_node_step.value - a_start = end - min(end, a_step * a_size) - if archives.setdefault(a_start, a_step) > a_step: - archives[a_start] = a_step - archives = list(sorted(archives.items())) - - try: - while ts <= end: - while not step or archives and ts >= archives[0][0]: - archive_start, step = archives.pop(0) - ts = max(ts / step * step, archive_start) - yield ts, ts + step - 1, step - ts += step - except GeneratorExit: - shutil.rmtree(tmp_path, ignore_errors=True) - - start = next(volume['context'].find(limit=1, order_by='ctime')[0])['ctime'] - for left, right, step in timeline(start): - for resource, props in [ - ('user', []), - ('context', []), - ('release', ['context']), - ('report', ['context', 'release']), - ('post', ['context', 'topic', 'type', 'vote']), - ]: - objs, __ = volume[resource].find( - query='ctime:%s..%s' % (left, right)) - for obj in objs: - request = Request(method='POST', path=[resource], - content=obj.properties(props)) - new_stats.log(request) - for resource, props in [ - ('user', ['layer']), - ('context', ['layer']), - ('release', ['layer']), - ('report', ['layer']), - ('post', ['layer']), - ]: - objs, __ = volume[resource].find( - query='mtime:%s..%s' % (left, right)) - for obj in objs: - if 'deleted' in obj['layer']: - request = Request(method='DELETE', - path=[resource, obj.guid]) - else: - request = Request(method='PUT', path=[resource, obj.guid], - content=obj.properties(props)) - new_stats.log(request) - downloaded = {} - for resource in ('context', 'post'): - stats = old_stats.report( - {resource: ['downloaded']}, left - step, right, 1) - if not stats.get(resource): - continue - stats = stats[resource][-1][1].get('downloaded') - if stats: - downloaded[resource] = {'downloaded': stats} - new_stats.commit(left + (right - left) / 2, downloaded) - - new_stats.commit_objects(True) - shutil.rmtree(path) - shutil.move(tmp_path, path) - - -@contextmanager -def load_bundle(volume, request, bundle_path): - impl = request.copy() - initial = False - if 'initial' in impl: - initial = impl.pop('initial') - data = impl.setdefault('data', {}) - contexts = volume['context'] - context = impl.get('context') - context_meta = None - impls = volume['release'] - - try: - bundle = Bundle(bundle_path, mime_type='application/zip') - except Exception: - _logger.debug('Load unrecognized bundle from %r', bundle_path) - context_type = 'book' - else: - _logger.debug('Load Sugar Activity bundle from %r', bundle_path) - context_type = 'activity' - unpack_size = 0 - - with bundle: - changelog = join(bundle.rootdir, 'CHANGELOG') - for arcname in bundle.get_names(): - if changelog and arcname == changelog: - with bundle.extractfile(changelog) as f: - impl['notes'] = f.read() - changelog = None - unpack_size += bundle.getmember(arcname).size - spec = bundle.get_spec() - context_meta = _load_context_metadata(bundle, spec) - if 'requires' in impl: - spec.requires.update(parse_requires(impl.pop('requires'))) - - context = impl['context'] = spec['context'] - impl['version'] = spec['version'] - impl['stability'] = spec['stability'] - if spec['license'] is not EMPTY_LICENSE: - impl['license'] = spec['license'] - requires = impl['requires'] = [] - for dep_name, dep in spec.requires.items(): - found = False - for version in dep.versions_range(): - requires.append('%s-%s' % (dep_name, version)) - found = True - if not found: - requires.append(dep_name) - - data['spec'] = {'*-*': { - 'commands': spec.commands, - 'requires': spec.requires, - }} - data['unpack_size'] = unpack_size - data['mime_type'] = 'application/vnd.olpc-sugar' - - if initial and not contexts.exists(context): - context_meta['type'] = 'activity' - request.call(method='POST', path=['context'], content=context_meta) - context_meta = None - - enforce(context, 'Context is not specified') - enforce('version' in impl, 'Version is not specified') - enforce(context_type in contexts.get(context)['type'], - http.BadRequest, 'Inappropriate bundle type') - if 'license' not in impl: - existing, total = impls.find( - context=context, order_by='-version', not_layer='deleted') - enforce(total, 'License is not specified') - impl['license'] = next(existing)['license'] - - digest = hashlib.sha1() - with file(bundle_path, 'rb') as f: - while True: - chunk = f.read(toolkit.BUFFER_SIZE) - if not chunk: - break - digest.update(chunk) - data['digest'] = digest.hexdigest() - - yield impl - - existing, __ = impls.find( - context=context, version=impl['version'], not_layer='deleted') - if 'url' not in data: - data['blob'] = bundle_path - impl['guid'] = request.call(method='POST', path=['release'], content=impl) - for i in existing: - layer = i['layer'] + ['deleted'] - impls.update(i.guid, {'layer': layer}) - - if 'origin' in impls.get(impl['guid']).layer: - diff = contexts.patch(context, context_meta) - if diff: - request.call(method='PUT', path=['context', context], content=diff) - - -def _load_context_metadata(bundle, spec): - result = {} - for prop in ('homepage', 'mime_types'): - if spec[prop]: - result[prop] = spec[prop] - result['guid'] = spec['context'] - - try: - icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon'])) - Context.populate_images(result, icon_file.read()) - icon_file.close() - except Exception: - exception(_logger, 'Failed to load icon') - - msgids = {} - for prop, confname in [ - ('title', 'name'), - ('summary', 'summary'), - ('description', 'description'), - ]: - if spec[confname]: - msgids[prop] = spec[confname] - result[prop] = {'en': spec[confname]} - with toolkit.mkdtemp() as tmpdir: - for path in bundle.get_names(): - if not path.endswith('.mo'): - continue - mo_path = path.strip(os.sep).split(os.sep) - if len(mo_path) != 5 or mo_path[1] != 'locale': - continue - lang = mo_path[2] - bundle.extract(path, tmpdir) - try: - i18n = gettext.translation(spec['context'], - join(tmpdir, *mo_path[:2]), [lang]) - for prop, value in msgids.items(): - msgstr = i18n.gettext(value).decode('utf8') - if lang == 'en' or msgstr != value: - result[prop][lang] = msgstr - except Exception: - exception(_logger, 'Gettext failed to read %r', mo_path[-1]) - - return result diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 69584be..2d60ea8 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -23,9 +23,10 @@ from gettext import gettext as _ from sugar_network import node, toolkit from sugar_network.client import api_url -from sugar_network.node import sync, stats_user, files, volume +from sugar_network.node import sync, stats_user, files, model from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, enforce @@ -55,7 +56,7 @@ class SlaveRoutes(NodeRoutes): # loosing payload after authentication conn.get(cmd='logon') - push = [('diff', None, volume.diff(self.volume, self._push_seq))] + push = [('diff', None, model.diff(self.volume, self._push_seq))] if not no_pull: push.extend([ ('pull', { @@ -119,7 +120,7 @@ class SlaveRoutes(NodeRoutes): }, None)) push.append(('files_pull', {'sequence': self._files_seq}, None)) - self.broadcast({ + this.broadcast({ 'event': 'sync_progress', 'progress': _('Reading sneakernet packages'), }) @@ -129,14 +130,14 @@ class SlaveRoutes(NodeRoutes): if exists(offline_script): shutil.copy(offline_script, path) - self.broadcast({ + this.broadcast({ 'event': 'sync_progress', 'progress': _('Generating new sneakernet package'), }) diff_seq = toolkit.Sequence([]) push.append(('diff', None, - volume.diff(self.volume, push_seq, diff_seq))) + model.diff(self.volume, push_seq, diff_seq))) if stats_user.stats_user.value: push.append(('stats_diff', None, stats_user.diff(stats_seq))) complete = sync.sneakernet_encode(push, root=path, @@ -156,7 +157,7 @@ class SlaveRoutes(NodeRoutes): if packet.name == 'diff': _logger.debug('Processing %r', packet) - seq, __ = volume.merge(self.volume, packet, shift_seqno=False) + seq, __ = model.merge(self.volume, packet, shift_seqno=False) if from_master and seq: self._pull_seq.exclude(seq) self._pull_seq.commit() diff --git a/sugar_network/node/stats_node.py b/sugar_network/node/stats_node.py deleted file mode 100644 index d37819b..0000000 --- a/sugar_network/node/stats_node.py +++ /dev/null @@ -1,311 +0,0 @@ -# Copyright (C) 2012-2014 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import time -import json -import logging -from os.path import exists, join - -from sugar_network.toolkit.rrd import Rrd -from sugar_network.toolkit import Option - - -stats_node = Option( - 'collect unpersonalized node statistics', - default=False, type_cast=Option.bool_cast, action='store_true') - -stats_node_step = Option( - 'step interval in seconds for node RRD databases', - default=60 * 5, type_cast=int) - -stats_node_rras = Option( - 'comma separated list of RRAs for node RRD databases', - default=[ - 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step - 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step - 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step - 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step - 'RRA:AVERAGE:0.5:105408:10', # 10y with 1y step - ], - type_cast=Option.list_cast, type_repr=Option.list_repr) - -_HEARTBEAT = 60 * 60 * 24 * 365 - -_logger = logging.getLogger('node.stats_node') - - -class Sniffer(object): - - def __init__(self, volume, path, reset=False): - _logger.info('Collect node stats in %r', path) - - self._volume = volume - self._rrd = Rrd(path, stats_node_step.value, stats_node_rras.value) - self._stats = {} - self._suspend_path = join(path, '.suspend') - self._last = int(time.time()) - - for name, cls in _STATS.items(): - stats = self._stats[name] = cls(self._stats, volume) - fields = {} - for field in stats: - fields[field] = 'DS:%s:GAUGE:%s:U:U' % (field, _HEARTBEAT) - if fields: - if not reset: - stats.update(self._rrd[name].last_ds) - stats['total'] = volume[name].find(limit=0)[1] - self._rrd[name].fields = fields - - if exists(self._suspend_path): - with file(self._suspend_path) as f: - suspend = json.load(f) - for name, stats in self._stats.items(): - if name not in suspend['state']: - continue - total_stats, stats.objects = suspend['state'][name] - stats.update(total_stats) - if suspend['timestamp'] < int(time.time()): - self.commit(suspend['timestamp']) - self.commit_objects() - os.unlink(self._suspend_path) - - def __getitem__(self, name): - return self._rrd[name] - - def suspend(self): - state = dict([(i, (j, j.objects)) for i, j in self._stats.items()]) - with file(self._suspend_path, 'w') as f: - json.dump({ - 'timestamp': self._last + stats_node_step.value, - 'state': state, - }, f) - - def log(self, request): - if request.cmd or request.resource not in _STATS: - return - self._stats[request.resource].log(request) - - def commit(self, timestamp=None, extra_values=None): - _logger.trace('Commit node stats') - - for resource, stats in self._stats.items(): - if resource not in self._rrd: - continue - values = stats.copy() - if extra_values and resource in extra_values: - values.update(extra_values[resource]) - if values: - self._rrd[resource].put(values, timestamp=timestamp) - - self._last = timestamp or int(time.time()) - - def commit_objects(self, reset=False): - _logger.trace('Commit object stats') - - for resource, stats in self._stats.items(): - old = { - 'downloads': 0, - 'rating': (0, 0), - } - directory = self._volume[resource] - for guid, new in stats.objects.items(): - if not directory.exists(guid): - _logger.warning('Ignore stats for missed %r %s', - guid, resource) - continue - if not reset: - old = directory.get(guid) - patch = {} - if 'downloads' in new: - patch['downloads'] = new['downloads'] + old['downloads'] - if 'votes' in new: - votes, rating = old['rating'] - votes += new['votes'] - rating += new['rating'] - patch['rating'] = [votes, rating] - directory.update(guid, patch) - stats.objects.clear() - - def report(self, dbs, start, end, records): - result = {} - - rdbs = [self._rrd[i] for i in dbs if i in self._rrd] - if not rdbs: - return result - - if not start: - start = min([i.first for i in rdbs]) or 0 - if not end: - end = max([i.last for i in rdbs]) or 0 - resolution = max(1, (end - start) / records) - - _logger.debug('Report start=%s end=%s resolution=%s dbs=%r', - start, end, resolution, dbs) - - for rdb in rdbs: - info = result[rdb.name] = [] - for ts, ds_values in rdb.get(start, end, resolution): - values = {} - for name in dbs[rdb.name]: - values[name] = ds_values.get(name) - info.append((ts, values)) - - return result - - -class _Stats(dict): - - RESOURCE = None - PARENTS = [] - - def __init__(self, stats, volume): - self.objects = {} - self._stats = stats - self._volume = volume - - def inc(self, guid, prop, value=1): - obj = self.objects.setdefault(guid, {}) - if prop not in obj: - obj[prop] = value - else: - obj[prop] += value - - def log(self, request): - pass - - -class _ResourceStats(_Stats): - - def __init__(self, stats, volume): - _Stats.__init__(self, stats, volume) - self['total'] = 0 - - def log(self, request): - if request.method == 'POST': - self['total'] += 1 - elif request.method == 'DELETE': - self['total'] -= 1 - - def parse_context(self, request): - context = None - directory = self._volume[self.RESOURCE] - - def parse_context(props): - for prop, resource in self.PARENTS: - guid = props.get(prop) - if not guid: - continue - if resource == 'context': - return guid - else: - return self._volume[resource].get(guid)['context'] - - if request.method == 'GET': - if not request.guid: - context = parse_context(request) - elif self.RESOURCE == 'context': - context = request.guid - elif self.RESOURCE != 'user': - context = directory.get(request.guid)['context'] - elif request.method == 'PUT': - if self.RESOURCE == 'context': - context = request.guid - else: - context = request.content.get('context') - if not context: - context = directory.get(request.guid)['context'] - elif request.method == 'POST': - context = parse_context(request.content) - - return context - - -class _UserStats(_ResourceStats): - - RESOURCE = 'user' - - -class _ContextStats(_ResourceStats): - - RESOURCE = 'context' - - def __init__(self, stats, volume): - _ResourceStats.__init__(self, stats, volume) - self['released'] = 0 - self['failed'] = 0 - self['downloaded'] = 0 - - -class _ReleaseStats(_Stats): - - RESOURCE = 'release' - PARENTS = [('context', 'context')] - - def log(self, request): - if request.method == 'GET': - if request.prop == 'data': - context = self._volume[self.RESOURCE].get(request.guid) - self._stats['context'].inc(context.context, 'downloads') - self._stats['context']['downloaded'] += 1 - elif request.method == 'POST': - self._stats['context']['released'] += 1 - - -class _ReportStats(_Stats): - - RESOURCE = 'report' - PARENTS = [('context', 'context'), ('release', 'release')] - - def log(self, request): - if request.method == 'POST': - self._stats['context']['failed'] += 1 - - -class _PostStats(_ResourceStats): - - RESOURCE = 'post' - PARENTS = [('context', 'context'), ('topic', 'post')] - - def __init__(self, stats, volume): - _ResourceStats.__init__(self, stats, volume) - self['downloaded'] = 0 - - def log(self, request): - _ResourceStats.log(self, request) - - if request.method == 'POST': - stats = None - if request.content['type'] == 'review': - stats = self._stats['context'] - guid = request.content['context'] - elif request.content['type'] == 'feedback': - stats = self._stats['post'] - guid = request.content['topic'] - if stats: - stats.inc(guid, 'votes') - stats.inc(guid, 'rating', request.content.get('vote') or 0) - - elif request.method == 'GET' and request.prop == 'data': - self.inc(request.guid, 'downloads') - self['downloaded'] += 1 - - -_STATS = {_UserStats.RESOURCE: _UserStats, - _ContextStats.RESOURCE: _ContextStats, - _ReleaseStats.RESOURCE: _ReleaseStats, - _ReportStats.RESOURCE: _ReportStats, - _PostStats.RESOURCE: _PostStats, - } diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py index b0a20bf..f5b946c 100644 --- a/sugar_network/node/sync.py +++ b/sugar_network/node/sync.py @@ -199,7 +199,7 @@ def _encode(limit, packets, download_blobs, header, status): pos = (yield chunk) or 0 blob_size -= len(chunk) enforce(blob_size == 0, EOFError, - 'Blob size is not the same as declared') + 'File size is not the same as declared') record = next(content) except StopIteration: diff --git a/sugar_network/node/volume.py b/sugar_network/node/volume.py deleted file mode 100644 index 0c254f7..0000000 --- a/sugar_network/node/volume.py +++ /dev/null @@ -1,142 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import logging - -from sugar_network import toolkit -from sugar_network.toolkit.router import Request -from sugar_network.toolkit import http, coroutine, enforce - - -# Apply node level layer for these documents -_LIMITED_RESOURCES = ('context', 'release') - -_logger = logging.getLogger('node.volume') - - -def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, - fetch_blobs=False, ignore_documents=None, **kwargs): - connection = http.Connection() - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_the_only_seq = not out_seq - if layer: - if isinstance(layer, basestring): - layer = [layer] - layer.append('common') - try: - for resource, directory in volume.items(): - if ignore_documents and resource in ignore_documents: - continue - coroutine.dispatch() - directory.commit() - yield {'resource': resource} - for guid, patch in directory.diff(in_seq, exclude_seq, - layer=layer if resource in _LIMITED_RESOURCES else None): - adiff = {} - adiff_seq = toolkit.Sequence() - for prop, meta, seqno in patch: - if 'blob' in meta: - blob_path = meta.pop('blob') - yield {'guid': guid, - 'diff': {prop: meta}, - 'blob_size': meta['blob_size'], - 'blob': toolkit.iter_file(blob_path), - } - elif fetch_blobs and 'url' in meta: - url = meta.pop('url') - try: - blob = connection.request('GET', url, - allow_redirects=True, - # We need uncompressed size - headers={'Accept-Encoding': ''}) - except Exception: - _logger.exception('Cannot fetch %r for %s:%s:%s', - url, resource, guid, prop) - is_the_only_seq = False - continue - yield {'guid': guid, - 'diff': {prop: meta}, - 'blob_size': - int(blob.headers['Content-Length']), - 'blob': blob.iter_content(toolkit.BUFFER_SIZE), - } - else: - adiff[prop] = meta - adiff_seq.include(seqno, seqno) - if adiff: - yield {'guid': guid, 'diff': adiff} - out_seq.include(adiff_seq) - if is_the_only_seq: - # There is only one diff, so, we can stretch it to remove all holes - out_seq.stretch() - except StopIteration: - pass - - yield {'commit': out_seq} - - -def merge(volume, records, shift_seqno=True, stats=None): - resource = None - directory = None - commit_seq = toolkit.Sequence() - merged_seq = toolkit.Sequence() - synced = False - - for record in records: - resource_ = record.get('resource') - if resource_: - resource = resource_ - directory = volume[resource_] - continue - - if 'guid' in record: - guid = record['guid'] - layer = [] - existed = directory.exists(guid) - if existed: - layer = directory.get(guid)['layer'] - - def update_stats(upd): - method = 'PUT' if existed else 'POST' - if ('deleted' in layer) != ('deleted' in upd.get('layer', [])): - if 'deleted' in layer: - # TODO - enforce(not 'supported yet') - else: - method = 'DELETE' - stats.log(Request( - method=method, - path=[resource, guid], - content=upd, - )) - - if stats is not None: - record['op'] = update_stats - seqno, merged = directory.merge(shift_seqno=shift_seqno, **record) - synced = synced or merged - if seqno is not None: - merged_seq.include(seqno, seqno) - continue - - commit = record.get('commit') - if commit is not None: - commit_seq.include(commit) - continue - - if synced: - volume.broadcast({'event': 'sync'}) - - return commit_seq, merged_seq |