Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/node
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/node')
-rw-r--r--sugar_network/node/master.py73
-rw-r--r--sugar_network/node/model.py177
-rw-r--r--sugar_network/node/obs.py116
-rw-r--r--sugar_network/node/routes.py466
-rw-r--r--sugar_network/node/slave.py13
-rw-r--r--sugar_network/node/stats_node.py311
-rw-r--r--sugar_network/node/sync.py2
-rw-r--r--sugar_network/node/volume.py142
8 files changed, 310 insertions, 990 deletions
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index 19a8cf1..c7c22e0 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -20,12 +20,19 @@ from Cookie import SimpleCookie
from os.path import join
from sugar_network import node, toolkit
-from sugar_network.node import sync, stats_user, files, volume, downloads, obs
+from sugar_network.node import sync, stats_user, files, model, downloads, obs
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
-from sugar_network.toolkit import http, coroutine, enforce
+from sugar_network.toolkit import http, enforce
+RESOURCES = (
+ 'sugar_network.node.model',
+ 'sugar_network.model.post',
+ 'sugar_network.model.report',
+ 'sugar_network.model.user',
+ )
+
_ONE_WAY_DOCUMENTS = ['report']
_logger = logging.getLogger('node.master')
@@ -33,12 +40,12 @@ _logger = logging.getLogger('node.master')
class MasterRoutes(NodeRoutes):
- def __init__(self, guid, volume_):
- NodeRoutes.__init__(self, guid, volume_)
+ def __init__(self, guid, volume, **kwargs):
+ NodeRoutes.__init__(self, guid, volume=volume, **kwargs)
self._pulls = {
'pull': lambda **kwargs:
- ('diff', None, volume.diff(self.volume,
+ ('diff', None, model.diff(self.volume,
ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)),
'files_pull': lambda **kwargs:
('files_diff', None, self._files.diff(**kwargs)),
@@ -50,7 +57,7 @@ class MasterRoutes(NodeRoutes):
if node.files_root.value:
self._files = files.Index(node.files_root.value,
- join(volume_.root, 'files.index'), volume_.seqno)
+ join(volume.root, 'files.index'), volume.seqno)
@route('POST', cmd='sync',
acl=ACL.AUTH)
@@ -137,25 +144,13 @@ class MasterRoutes(NodeRoutes):
enforce(node.files_root.value, http.BadRequest, 'Disabled')
aliases = self.volume['context'].get(request.guid)['aliases']
enforce(aliases, http.BadRequest, 'Nothing to presolve')
- return obs.presolve(aliases, node.files_root.value)
+ return obs.presolve(None, aliases, node.files_root.value)
def status(self):
result = NodeRoutes.status(self)
result['level'] = 'master'
return result
- def after_post(self, doc):
- if doc.metadata.name == 'context':
- shift_releases = doc.modified('dependencies')
- if doc.modified('aliases'):
- # TODO Already launched job should be killed
- coroutine.spawn(self._resolve_aliases, doc)
- shift_releases = True
- if shift_releases and not doc.is_new:
- # Shift checkpoint to invalidate solutions
- self.volume['release'].checkpoint()
- NodeRoutes.after_post(self, doc)
-
def _push(self, stream):
reply = []
cookie = _Cookie()
@@ -172,8 +167,7 @@ class MasterRoutes(NodeRoutes):
if self._files is not None:
cookie['files_pull'].include(packet['sequence'])
elif packet.name == 'diff':
- seq, ack_seq = volume.merge(self.volume, packet,
- stats=self._stats)
+ seq, ack_seq = model.merge(self.volume, packet)
reply.append(('ack', {
'ack': ack_seq,
'sequence': seq,
@@ -189,43 +183,6 @@ class MasterRoutes(NodeRoutes):
return reply, cookie
- def _resolve_aliases(self, doc):
- packages = {}
- for repo in obs.get_repos():
- alias = doc['aliases'].get(repo['distributor_id'])
- if not alias:
- continue
- package = packages[repo['name']] = {}
- for kind in ('binary', 'devel'):
- obs_fails = []
- for to_resolve in alias.get(kind) or []:
- if not to_resolve:
- continue
- try:
- for arch in repo['arches']:
- obs.resolve(repo['name'], arch, to_resolve)
- except Exception, error:
- _logger.warning('Failed to resolve %r on %s',
- to_resolve, repo['name'])
- obs_fails.append(str(error))
- continue
- package[kind] = to_resolve
- break
- else:
- package['status'] = '; '.join(obs_fails)
- break
- else:
- if 'binary' in package:
- package['status'] = 'success'
- else:
- package['status'] = 'no packages to resolve'
-
- if packages != doc['packages']:
- self.volume['context'].update(doc.guid, {'packages': packages})
-
- if node.files_root.value:
- obs.presolve(doc['aliases'], node.files_root.value)
-
class _Cookie(list):
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
new file mode 100644
index 0000000..2681b2d
--- /dev/null
+++ b/sugar_network/node/model.py
@@ -0,0 +1,177 @@
+# Copyright (C) 2012-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from sugar_network import db, toolkit
+from sugar_network.model import Release, context
+from sugar_network.node import obs
+from sugar_network.toolkit.router import ACL
+from sugar_network.toolkit.coroutine import this
+from sugar_network.toolkit import http, coroutine, enforce
+
+
+_logger = logging.getLogger('node.model')
+_presolve_queue = None
+
+
+class _Release(Release):
+
+ _package_cast = db.Dict(db.List())
+
+ def typecast(self, value):
+ if not this.resource.exists or 'package' not in this.resource['type']:
+ return Release.typecast(self, value)
+
+ value = self._package_cast.typecast(value)
+ enforce(value.get('binary'), http.BadRequest, 'No binary aliases')
+
+ distro = this.request.key
+ if distro == '*':
+ lsb_id = None
+ lsb_release = None
+ elif '-' in this.request.key:
+ lsb_id, lsb_release = distro.split('-', 1)
+ else:
+ lsb_id = distro
+ lsb_release = None
+ releases = this.resource.record.get('releases')
+ statuses = releases['value'].setdefault('status', {})
+ to_presolve = []
+
+ for repo in obs.get_repos():
+ if lsb_id and lsb_id != repo['lsb_id'] or \
+ lsb_release and lsb_release != repo['lsb_release']:
+ continue
+ # Make sure there are no alias overrides
+ if not lsb_id and repo['lsb_id'] in releases['value'] or \
+ not lsb_release and repo['name'] in releases['value']:
+ continue
+ pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], [])
+ try:
+ for arch in repo['arches']:
+ obs.resolve(repo['name'], arch, pkgs)
+ except Exception, error:
+ _logger.warning('Failed to resolve %r on %s',
+ pkgs, repo['name'])
+ status = str(error)
+ else:
+ to_presolve.append((repo['name'], pkgs))
+ status = 'success'
+ statuses[repo['name']] = status
+
+ if to_presolve and _presolve_queue is not None:
+ _presolve_queue.put(to_presolve)
+ if statuses:
+ this.resource.record.set('releases', **releases)
+
+ return value
+
+ def teardown(self, value):
+ if 'package' not in this.resource['type']:
+ return Release.typecast(self, value)
+ # TODO Delete presolved files
+
+
+class Context(context.Context):
+
+ @db.stored_property(db.Aggregated, subtype=_Release(),
+ acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE)
+ def releases(self, value):
+ return value
+
+ @releases.setter
+ def releases(self, value):
+ if value or this.request.method != 'POST':
+ self.invalidate_solutions()
+ return value
+
+
+def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None,
+ ignore_documents=None, **kwargs):
+ if out_seq is None:
+ out_seq = toolkit.Sequence([])
+ is_the_only_seq = not out_seq
+ if layer:
+ if isinstance(layer, basestring):
+ layer = [layer]
+ layer.append('common')
+ try:
+ for resource, directory in volume.items():
+ if ignore_documents and resource in ignore_documents:
+ continue
+ coroutine.dispatch()
+ directory.commit()
+ yield {'resource': resource}
+ for guid, patch in directory.diff(in_seq, exclude_seq,
+ layer=layer if resource == 'context' else None):
+ adiff = {}
+ adiff_seq = toolkit.Sequence()
+ for prop, meta, seqno in patch:
+ adiff[prop] = meta
+ adiff_seq.include(seqno, seqno)
+ if adiff:
+ yield {'guid': guid, 'diff': adiff}
+ out_seq.include(adiff_seq)
+ if is_the_only_seq:
+ # There is only one diff, so, we can stretch it to remove all holes
+ out_seq.stretch()
+ except StopIteration:
+ pass
+
+ yield {'commit': out_seq}
+
+
+def merge(volume, records):
+ directory = None
+ commit_seq = toolkit.Sequence()
+ merged_seq = toolkit.Sequence()
+ synced = False
+
+ for record in records:
+ resource_ = record.get('resource')
+ if resource_:
+ resource = resource_
+ directory = volume[resource_]
+ continue
+
+ if 'guid' in record:
+ guid = record['guid']
+ existed = directory.exists(guid)
+ if existed:
+ layer = directory.get(guid)['layer']
+ seqno, merged = directory.merge(**record)
+ synced = synced or merged
+ if seqno is not None:
+ merged_seq.include(seqno, seqno)
+ continue
+
+ commit = record.get('commit')
+ if commit is not None:
+ commit_seq.include(commit)
+ continue
+
+ if synced:
+ this.broadcast({'event': 'sync'})
+
+ return commit_seq, merged_seq
+
+
+def presolve(presolve_path):
+ global _presolve_queue
+ _presolve_queue = coroutine.Queue()
+
+ for repo_name, pkgs in _presolve_queue:
+ obs.presolve(repo_name, pkgs, presolve_path)
diff --git a/sugar_network/node/obs.py b/sugar_network/node/obs.py
index 1d8a547..6ef9e55 100644
--- a/sugar_network/node/obs.py
+++ b/sugar_network/node/obs.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -37,7 +37,7 @@ obs_presolve_project = Option(
default='presolve')
_logger = logging.getLogger('node.obs')
-_client = None
+_conn = None
_repos = {}
@@ -45,82 +45,68 @@ def get_repos():
return _get_repos(obs_project.value)
-def resolve(repo, arch, names):
- for package in names:
- _request('GET', ['resolve'], params={
- 'project': obs_project.value,
- 'repository': repo,
- 'arch': arch,
- 'package': package,
- })
+def resolve(repo, arch, packages):
+ _request('GET', ['resolve'], params={
+ 'project': obs_project.value,
+ 'repository': repo,
+ 'arch': arch,
+ 'package': packages,
+ })
-def presolve(aliases, dst_path):
+def presolve(repo_name, packages, dst_path):
for repo in _get_repos(obs_presolve_project.value):
- # Presolves make sense only for XO, thus, for Fedora
- alias = aliases.get('Fedora')
- if not alias:
- continue
-
- name_variants = alias['binary']
- while name_variants:
- names = name_variants.pop()
- presolves = []
+ dst_dir = join(dst_path, 'packages',
+ obs_presolve_project.value, repo['name'])
+ result = {}
+ to_download = []
+
+ for package in packages:
+ files = result.setdefault(package, {})
try:
- for arch in repo['arches']:
- for package in names:
- response = _request('GET', ['resolve'], params={
- 'project': obs_presolve_project.value,
- 'repository': repo['name'],
- 'arch': arch,
- 'package': package,
- 'withdeps': '1',
- 'exclude': 'sweets-sugar',
- })
- binaries = []
- for pkg in response.findall('binary'):
- binaries.append(dict(pkg.items()))
- presolves.append((package, binaries))
+ for repo_arch in repo['arches']:
+ response = _request('GET', ['resolve'], params={
+ 'project': obs_presolve_project.value,
+ 'repository': '%(lsb_id)s-%(lsb_release)s' % repo,
+ 'arch': repo_arch,
+ 'package': package,
+ 'withdeps': '1',
+ 'exclude': 'sweets-sugar',
+ })
+ for binary in response.findall('binary'):
+ binary = dict(binary.items())
+ arch = binary.pop('arch')
+ url = binary.pop('url')
+ filename = binary['path'] = basename(url)
+ path = join(dst_dir, filename)
+ if not exists(path):
+ to_download.append((url, path))
+ files.setdefault(arch, []).append(binary)
except Exception:
toolkit.exception(_logger, 'Failed to presolve %r on %s',
- names, repo['name'])
+ packages, repo['name'])
continue
- _logger.debug('Presolve %r on %s', names, repo['name'])
-
- dst_dir = join(dst_path, 'packages',
- obs_presolve_project.value, repo['name'])
- if not exists(dst_dir):
- os.makedirs(dst_dir)
- result = {}
-
- for package, binaries in presolves:
- files = []
- for binary in binaries:
- arch = binary.pop('arch')
- if not files:
- result.setdefault(package, {})[arch] = files
- url = binary.pop('url')
- filename = binary['path'] = basename(url)
- path = join(dst_dir, filename)
- if not exists(path):
- _client.download(url, path)
- files.append(binary)
+ _logger.debug('Presolve %r on %s', packages, repo['name'])
- for package, info in result.items():
- with toolkit.new_file(join(dst_dir, package)) as f:
- json.dump(info, f)
+ if not exists(dst_dir):
+ os.makedirs(dst_dir)
+ for url, path in to_download:
+ _conn.download(url, path)
+ for package, info in result.items():
+ with toolkit.new_file(join(dst_dir, package)) as f:
+ json.dump(info, f)
- return {'repo': repo['name'], 'packages': result}
+ return {'repo': repo['name'], 'packages': result}
def _request(*args, **kwargs):
- global _client
+ global _conn
- if _client is None:
- _client = http.Connection(obs_url.value)
+ if _conn is None:
+ _conn = http.Connection(obs_url.value)
- response = _client.request(*args, allowed=(400, 404), **kwargs)
+ response = _conn.request(*args, allowed=(400, 404), **kwargs)
enforce(response.headers.get('Content-Type') == 'text/xml',
'Irregular OBS response')
reply = ElementTree.fromstring(response.content)
@@ -144,8 +130,10 @@ def _get_repos(project):
for repo in _request('GET', ['build', project]).findall('entry'):
repo = repo.get('name')
arches = _request('GET', ['build', project, repo])
+ lsb_id, lsb_release = repo.split('-', 1)
repos.append({
- 'distributor_id': repo.split('-', 1)[0],
+ 'lsb_id': lsb_id,
+ 'lsb_release': lsb_release,
'name': repo,
'arches': [i.get('name') for i in arches.findall('entry')],
})
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index eb48c70..6323cbc 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,23 +15,21 @@
import os
import time
-import shutil
-import gettext
import logging
import hashlib
-from contextlib import contextmanager
from ConfigParser import ConfigParser
from os.path import join, isdir, exists
-from sugar_network import node, toolkit, model
-from sugar_network.node import stats_node, stats_user
-from sugar_network.model.context import Context
+from sugar_network import db, node, toolkit, model
+from sugar_network.db import files
+from sugar_network.node import stats_user
# pylint: disable-msg=W0611
from sugar_network.toolkit.router import route, preroute, postroute, ACL
from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute
-from sugar_network.toolkit.spec import EMPTY_LICENSE
from sugar_network.toolkit.spec import parse_requires, ensure_requires
+from sugar_network.toolkit.spec import parse_version
from sugar_network.toolkit.bundle import Bundle
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import pylru, http, coroutine, exception, enforce
@@ -41,28 +39,16 @@ _AUTH_POOL_SIZE = 1024
_logger = logging.getLogger('node.routes')
-class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
+class NodeRoutes(db.Routes, model.FrontRoutes):
- def __init__(self, guid, volume):
- model.VolumeRoutes.__init__(self, volume)
+ def __init__(self, guid, **kwargs):
+ db.Routes.__init__(self, **kwargs)
model.FrontRoutes.__init__(self)
- volume.broadcast = self.broadcast
-
self._guid = guid
- self._stats = None
self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE)
self._auth_config = None
self._auth_config_mtime = 0
- if stats_node.stats_node.value:
- stats_path = join(node.stats_root.value, 'node')
- self._stats = stats_node.Sniffer(volume, stats_path)
- coroutine.spawn(self._commit_stats)
-
- def close(self):
- if self._stats is not None:
- self._stats.suspend()
-
@property
def guid(self):
return self._guid
@@ -80,33 +66,12 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
@route('GET', cmd='status', mime_type='application/json')
def status(self):
- documents = {}
- for name, directory in self.volume.items():
- documents[name] = {'mtime': directory.mtime}
- return {'guid': self._guid, 'resources': documents}
-
- @route('GET', cmd='stats', arguments={
- 'start': int, 'end': int, 'records': int, 'source': list},
- mime_type='application/json')
- def stats(self, start, end, records, source):
- enforce(self._stats is not None, 'Node stats is disabled')
- if not source:
- return {}
-
- if records > _MAX_STAT_RECORDS:
- _logger.debug('Decrease %d stats records number to %d',
- records, _MAX_STAT_RECORDS)
- records = _MAX_STAT_RECORDS
- elif records <= 0:
- records = _MAX_STAT_RECORDS / 10
-
- stats = {}
- for i in source:
- enforce('.' in i, 'Misnamed source')
- db_name, ds_name = i.split('.', 1)
- stats.setdefault(db_name, []).append(ds_name)
-
- return self._stats.report(stats, start, end, records)
+ return {'guid': self._guid,
+ 'seqno': {
+ 'db': self.volume.seqno.value,
+ 'releases': self.volume.releases_seqno.value,
+ },
+ }
@route('POST', ['user'], mime_type='application/json')
def register(self, request):
@@ -149,23 +114,19 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
response.content_type = 'application/json'
return result
- @route('POST', ['release'], cmd='submit',
+ @route('POST', ['context'], cmd='submit',
arguments={'initial': False},
mime_type='application/json', acl=ACL.AUTH)
- def submit_release(self, request, document):
- with toolkit.NamedTemporaryFile() as blob:
- shutil.copyfileobj(request.content_stream, blob)
- blob.flush()
- with load_bundle(self.volume, request, blob.name) as impl:
- impl['data']['blob'] = blob.name
- return impl['guid']
-
- @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
- def delete(self, request):
- # Servers data should not be deleted immediately
- # to let master-slave synchronization possible
- request.call(method='PUT', path=request.path,
- content={'layer': ['deleted']})
+ def submit_release(self, request, initial):
+ blob = files.post(request.content_stream)
+ try:
+ context, release = model.load_bundle(blob, initial=initial)
+ except Exception:
+ files.delete(blob.digest)
+ raise
+ this.call(method='POST', path=['context', context, 'releases'],
+ content_type='application/json', content=release)
+ return blob.digest
@route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER)
def attach(self, request):
@@ -186,43 +147,37 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
@route('GET', ['context', None], cmd='clone',
arguments={'requires': list})
def get_clone(self, request, response):
- return self._get_clone(request, response)
+ deps = {}
+ if 'requires' in request:
+ for i in request['requires']:
+ deps.update(parse_requires(i))
+ version = request.get('version')
+ if version:
+ version = parse_version(version)[0]
+ stability = request.get('stability') or 'stable'
+
+ recent = None
+ context = self.volume['context'][request.guid]
+ for release in context['releases'].values():
+ release = release.get('value')
+ if not release:
+ continue
+ spec = release['spec']['*-*']
+ if version and version != release['release'][0] or \
+ stability and stability != release['stability'] or \
+ deps and not ensure_requires(spec['requires'], deps):
+ continue
+ if recent is None or release['release'] > recent['release']:
+ recent = release
+ enforce(recent, http.NotFound, 'No releases found')
+
+ response.meta = recent
+ return files.get(recent['spec']['*-*']['bundle'])
@route('HEAD', ['context', None], cmd='clone',
arguments={'requires': list})
def head_clone(self, request, response):
- self._get_clone(request, response)
-
- @route('GET', ['context', None], cmd='deplist',
- mime_type='application/json', arguments={'requires': list})
- def deplist(self, request, repo):
- """List of native packages context is dependening on.
-
- Command return only GNU/Linux package names and ignores
- Sugar Network dependencies.
-
- :param repo:
- OBS repository name to get package names for, e.g.,
- Fedora-14
- :returns:
- list of package names
-
- """
- enforce(repo, 'Argument %r should be set', 'repo')
-
- spec = self._solve(request).meta('data')['spec']['*-*']
- common_deps = self.volume['context'].get(request.guid)['dependencies']
- result = []
-
- for package in set(spec.get('requires') or []) | set(common_deps):
- if package == 'sugar':
- continue
- dep = self.volume['context'].get(package)
- enforce(repo in dep['packages'],
- 'No packages for %r on %r', package, repo)
- result.extend(dep['packages'][repo].get('binary') or [])
-
- return result
+ self.get_clone(request, response)
@route('GET', ['user', None], cmd='stats-info',
mime_type='application/json', acl=ACL.AUTH)
@@ -246,15 +201,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
for timestamp, values in values:
rrd[name].put(values, timestamp)
- @route('GET', ['report', None], cmd='log', mime_type='text/html')
- def log(self, request):
- # In further implementations, `data` might be a tarball
- data = self.volume[request.resource].get(request.guid).meta('data')
- if data and 'blob' in data:
- return file(data['blob'], 'rb')
- else:
- return ''
-
@preroute
def preroute(self, op, request, response):
if op.acl & ACL.AUTH and request.principal is None:
@@ -277,22 +223,11 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
enforce(self.authorize(request.principal, 'root'), http.Forbidden,
'Operation is permitted only for superusers')
- @postroute
- def postroute(self, request, response, result, error):
- if error is None or isinstance(error, http.StatusPass):
- if self._stats is not None:
- self._stats.log(request)
-
- def on_create(self, request, props, event):
+ def on_create(self, request, props):
if request.resource == 'user':
- with file(props['pubkey']['blob']) as f:
+ with file(files.get(props['pubkey']).path) as f:
props['guid'] = str(hashlib.sha1(f.read()).hexdigest())
- model.VolumeRoutes.on_create(self, request, props, event)
-
- def on_update(self, request, props, event):
- model.VolumeRoutes.on_update(self, request, props, event)
- if 'deleted' in props.get('layer', []):
- event['event'] = 'delete'
+ db.Routes.on_create(self, request, props)
def on_aggprop_update(self, request, prop, value):
if prop.acl & ACL.AUTHOR:
@@ -300,27 +235,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
elif value is not None:
self._enforce_authority(request, value.get('author'))
- def find(self, request, reply):
- limit = request.get('limit')
- if limit is None or limit < 0:
- request['limit'] = node.find_limit.value
- elif limit > node.find_limit.value:
- _logger.warning('The find limit is restricted to %s',
- node.find_limit.value)
- request['limit'] = node.find_limit.value
- layer = request.setdefault('layer', [])
- if 'deleted' in layer:
- _logger.warning('Requesting "deleted" layer')
- layer.remove('deleted')
- request.add('not_layer', 'deleted')
- return model.VolumeRoutes.find(self, request, reply)
-
- def get(self, request, reply):
- doc = self.volume[request.resource].get(request.guid)
- enforce('deleted' not in doc['layer'], http.NotFound,
- 'Resource deleted')
- return model.VolumeRoutes.get(self, request, reply)
-
def authenticate(self, auth):
enforce(auth.scheme == 'sugar', http.BadRequest,
'Unknown authentication scheme')
@@ -329,8 +243,9 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
from M2Crypto import RSA
+ pubkey = self.volume['user'][auth.login]['pubkey']
+ key = RSA.load_pub_key(files.get(pubkey).path)
data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest()
- key = RSA.load_pub_key(self.volume['user'].path(auth.login, 'pubkey'))
enforce(key.verify(data, auth.signature.decode('hex')),
http.Forbidden, 'Bad credentials')
@@ -356,52 +271,6 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
return self._auth_config.get(user, role).strip().lower() in \
('true', 'on', '1', 'allow')
- def _commit_stats(self):
- while True:
- coroutine.sleep(stats_node.stats_node_step.value)
- self._stats.commit()
-
- def _solve(self, request):
- requires = {}
- if 'requires' in request:
- for i in request['requires']:
- requires.update(parse_requires(i))
- request.pop('requires')
- else:
- request['limit'] = 1
-
- if 'stability' not in request:
- request['stability'] = 'stable'
-
- impls, __ = self.volume['release'].find(
- context=request.guid, order_by='-version', not_layer='deleted',
- **request)
- impl = None
- for impl in impls:
- if requires:
- impl_deps = impl.meta('data')['spec']['*-*']['requires']
- if not ensure_requires(impl_deps, requires):
- continue
- break
- else:
- raise http.NotFound('No releases found')
- return impl
-
- def _get_clone(self, request, response):
- impl = self._solve(request)
- result = request.call(method=request.method,
- path=['release', impl['guid'], 'data'],
- response=response)
- response.meta = impl.properties([
- 'guid', 'ctime', 'layer', 'author', 'tags',
- 'context', 'version', 'stability', 'license', 'notes',
- ])
- response.meta['data'] = data = impl.meta('data')
- for key in ('mtime', 'seqno', 'blob'):
- if key in data:
- del data[key]
- return result
-
def _enforce_authority(self, request, author=None):
if request.resource == 'user':
allowed = (request.principal == request.guid)
@@ -412,222 +281,3 @@ class NodeRoutes(model.VolumeRoutes, model.FrontRoutes):
allowed = request.principal in author
enforce(allowed or self.authorize(request.principal, 'root'),
http.Forbidden, 'Operation is permitted only for authors')
-
-
-def generate_node_stats(volume, path):
- tmp_path = toolkit.mkdtemp()
- new_stats = stats_node.Sniffer(volume, tmp_path, True)
- old_stats = stats_node.Sniffer(volume, path)
-
- def timeline(ts):
- ts = long(ts)
- end = long(time.time())
- step = None
-
- archives = {}
- for rra in stats_node.stats_node_rras.value:
- a_step, a_size = [long(i) for i in rra.split(':')[-2:]]
- a_step *= stats_node.stats_node_step.value
- a_start = end - min(end, a_step * a_size)
- if archives.setdefault(a_start, a_step) > a_step:
- archives[a_start] = a_step
- archives = list(sorted(archives.items()))
-
- try:
- while ts <= end:
- while not step or archives and ts >= archives[0][0]:
- archive_start, step = archives.pop(0)
- ts = max(ts / step * step, archive_start)
- yield ts, ts + step - 1, step
- ts += step
- except GeneratorExit:
- shutil.rmtree(tmp_path, ignore_errors=True)
-
- start = next(volume['context'].find(limit=1, order_by='ctime')[0])['ctime']
- for left, right, step in timeline(start):
- for resource, props in [
- ('user', []),
- ('context', []),
- ('release', ['context']),
- ('report', ['context', 'release']),
- ('post', ['context', 'topic', 'type', 'vote']),
- ]:
- objs, __ = volume[resource].find(
- query='ctime:%s..%s' % (left, right))
- for obj in objs:
- request = Request(method='POST', path=[resource],
- content=obj.properties(props))
- new_stats.log(request)
- for resource, props in [
- ('user', ['layer']),
- ('context', ['layer']),
- ('release', ['layer']),
- ('report', ['layer']),
- ('post', ['layer']),
- ]:
- objs, __ = volume[resource].find(
- query='mtime:%s..%s' % (left, right))
- for obj in objs:
- if 'deleted' in obj['layer']:
- request = Request(method='DELETE',
- path=[resource, obj.guid])
- else:
- request = Request(method='PUT', path=[resource, obj.guid],
- content=obj.properties(props))
- new_stats.log(request)
- downloaded = {}
- for resource in ('context', 'post'):
- stats = old_stats.report(
- {resource: ['downloaded']}, left - step, right, 1)
- if not stats.get(resource):
- continue
- stats = stats[resource][-1][1].get('downloaded')
- if stats:
- downloaded[resource] = {'downloaded': stats}
- new_stats.commit(left + (right - left) / 2, downloaded)
-
- new_stats.commit_objects(True)
- shutil.rmtree(path)
- shutil.move(tmp_path, path)
-
-
-@contextmanager
-def load_bundle(volume, request, bundle_path):
- impl = request.copy()
- initial = False
- if 'initial' in impl:
- initial = impl.pop('initial')
- data = impl.setdefault('data', {})
- contexts = volume['context']
- context = impl.get('context')
- context_meta = None
- impls = volume['release']
-
- try:
- bundle = Bundle(bundle_path, mime_type='application/zip')
- except Exception:
- _logger.debug('Load unrecognized bundle from %r', bundle_path)
- context_type = 'book'
- else:
- _logger.debug('Load Sugar Activity bundle from %r', bundle_path)
- context_type = 'activity'
- unpack_size = 0
-
- with bundle:
- changelog = join(bundle.rootdir, 'CHANGELOG')
- for arcname in bundle.get_names():
- if changelog and arcname == changelog:
- with bundle.extractfile(changelog) as f:
- impl['notes'] = f.read()
- changelog = None
- unpack_size += bundle.getmember(arcname).size
- spec = bundle.get_spec()
- context_meta = _load_context_metadata(bundle, spec)
- if 'requires' in impl:
- spec.requires.update(parse_requires(impl.pop('requires')))
-
- context = impl['context'] = spec['context']
- impl['version'] = spec['version']
- impl['stability'] = spec['stability']
- if spec['license'] is not EMPTY_LICENSE:
- impl['license'] = spec['license']
- requires = impl['requires'] = []
- for dep_name, dep in spec.requires.items():
- found = False
- for version in dep.versions_range():
- requires.append('%s-%s' % (dep_name, version))
- found = True
- if not found:
- requires.append(dep_name)
-
- data['spec'] = {'*-*': {
- 'commands': spec.commands,
- 'requires': spec.requires,
- }}
- data['unpack_size'] = unpack_size
- data['mime_type'] = 'application/vnd.olpc-sugar'
-
- if initial and not contexts.exists(context):
- context_meta['type'] = 'activity'
- request.call(method='POST', path=['context'], content=context_meta)
- context_meta = None
-
- enforce(context, 'Context is not specified')
- enforce('version' in impl, 'Version is not specified')
- enforce(context_type in contexts.get(context)['type'],
- http.BadRequest, 'Inappropriate bundle type')
- if 'license' not in impl:
- existing, total = impls.find(
- context=context, order_by='-version', not_layer='deleted')
- enforce(total, 'License is not specified')
- impl['license'] = next(existing)['license']
-
- digest = hashlib.sha1()
- with file(bundle_path, 'rb') as f:
- while True:
- chunk = f.read(toolkit.BUFFER_SIZE)
- if not chunk:
- break
- digest.update(chunk)
- data['digest'] = digest.hexdigest()
-
- yield impl
-
- existing, __ = impls.find(
- context=context, version=impl['version'], not_layer='deleted')
- if 'url' not in data:
- data['blob'] = bundle_path
- impl['guid'] = request.call(method='POST', path=['release'], content=impl)
- for i in existing:
- layer = i['layer'] + ['deleted']
- impls.update(i.guid, {'layer': layer})
-
- if 'origin' in impls.get(impl['guid']).layer:
- diff = contexts.patch(context, context_meta)
- if diff:
- request.call(method='PUT', path=['context', context], content=diff)
-
-
-def _load_context_metadata(bundle, spec):
- result = {}
- for prop in ('homepage', 'mime_types'):
- if spec[prop]:
- result[prop] = spec[prop]
- result['guid'] = spec['context']
-
- try:
- icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon']))
- Context.populate_images(result, icon_file.read())
- icon_file.close()
- except Exception:
- exception(_logger, 'Failed to load icon')
-
- msgids = {}
- for prop, confname in [
- ('title', 'name'),
- ('summary', 'summary'),
- ('description', 'description'),
- ]:
- if spec[confname]:
- msgids[prop] = spec[confname]
- result[prop] = {'en': spec[confname]}
- with toolkit.mkdtemp() as tmpdir:
- for path in bundle.get_names():
- if not path.endswith('.mo'):
- continue
- mo_path = path.strip(os.sep).split(os.sep)
- if len(mo_path) != 5 or mo_path[1] != 'locale':
- continue
- lang = mo_path[2]
- bundle.extract(path, tmpdir)
- try:
- i18n = gettext.translation(spec['context'],
- join(tmpdir, *mo_path[:2]), [lang])
- for prop, value in msgids.items():
- msgstr = i18n.gettext(value).decode('utf8')
- if lang == 'en' or msgstr != value:
- result[prop][lang] = msgstr
- except Exception:
- exception(_logger, 'Gettext failed to read %r', mo_path[-1])
-
- return result
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 69584be..2d60ea8 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -23,9 +23,10 @@ from gettext import gettext as _
from sugar_network import node, toolkit
from sugar_network.client import api_url
-from sugar_network.node import sync, stats_user, files, volume
+from sugar_network.node import sync, stats_user, files, model
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, enforce
@@ -55,7 +56,7 @@ class SlaveRoutes(NodeRoutes):
# loosing payload after authentication
conn.get(cmd='logon')
- push = [('diff', None, volume.diff(self.volume, self._push_seq))]
+ push = [('diff', None, model.diff(self.volume, self._push_seq))]
if not no_pull:
push.extend([
('pull', {
@@ -119,7 +120,7 @@ class SlaveRoutes(NodeRoutes):
}, None))
push.append(('files_pull', {'sequence': self._files_seq}, None))
- self.broadcast({
+ this.broadcast({
'event': 'sync_progress',
'progress': _('Reading sneakernet packages'),
})
@@ -129,14 +130,14 @@ class SlaveRoutes(NodeRoutes):
if exists(offline_script):
shutil.copy(offline_script, path)
- self.broadcast({
+ this.broadcast({
'event': 'sync_progress',
'progress': _('Generating new sneakernet package'),
})
diff_seq = toolkit.Sequence([])
push.append(('diff', None,
- volume.diff(self.volume, push_seq, diff_seq)))
+ model.diff(self.volume, push_seq, diff_seq)))
if stats_user.stats_user.value:
push.append(('stats_diff', None, stats_user.diff(stats_seq)))
complete = sync.sneakernet_encode(push, root=path,
@@ -156,7 +157,7 @@ class SlaveRoutes(NodeRoutes):
if packet.name == 'diff':
_logger.debug('Processing %r', packet)
- seq, __ = volume.merge(self.volume, packet, shift_seqno=False)
+ seq, __ = model.merge(self.volume, packet, shift_seqno=False)
if from_master and seq:
self._pull_seq.exclude(seq)
self._pull_seq.commit()
diff --git a/sugar_network/node/stats_node.py b/sugar_network/node/stats_node.py
deleted file mode 100644
index d37819b..0000000
--- a/sugar_network/node/stats_node.py
+++ /dev/null
@@ -1,311 +0,0 @@
-# Copyright (C) 2012-2014 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import time
-import json
-import logging
-from os.path import exists, join
-
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.toolkit import Option
-
-
-stats_node = Option(
- 'collect unpersonalized node statistics',
- default=False, type_cast=Option.bool_cast, action='store_true')
-
-stats_node_step = Option(
- 'step interval in seconds for node RRD databases',
- default=60 * 5, type_cast=int)
-
-stats_node_rras = Option(
- 'comma separated list of RRAs for node RRD databases',
- default=[
- 'RRA:AVERAGE:0.5:1:864', # 3d with 5min step
- 'RRA:AVERAGE:0.5:288:3660', # 10y with 1d step
- 'RRA:AVERAGE:0.5:2880:366', # 10y with 10d step
- 'RRA:AVERAGE:0.5:8640:122', # 10y with 30d step
- 'RRA:AVERAGE:0.5:105408:10', # 10y with 1y step
- ],
- type_cast=Option.list_cast, type_repr=Option.list_repr)
-
-_HEARTBEAT = 60 * 60 * 24 * 365
-
-_logger = logging.getLogger('node.stats_node')
-
-
-class Sniffer(object):
-
- def __init__(self, volume, path, reset=False):
- _logger.info('Collect node stats in %r', path)
-
- self._volume = volume
- self._rrd = Rrd(path, stats_node_step.value, stats_node_rras.value)
- self._stats = {}
- self._suspend_path = join(path, '.suspend')
- self._last = int(time.time())
-
- for name, cls in _STATS.items():
- stats = self._stats[name] = cls(self._stats, volume)
- fields = {}
- for field in stats:
- fields[field] = 'DS:%s:GAUGE:%s:U:U' % (field, _HEARTBEAT)
- if fields:
- if not reset:
- stats.update(self._rrd[name].last_ds)
- stats['total'] = volume[name].find(limit=0)[1]
- self._rrd[name].fields = fields
-
- if exists(self._suspend_path):
- with file(self._suspend_path) as f:
- suspend = json.load(f)
- for name, stats in self._stats.items():
- if name not in suspend['state']:
- continue
- total_stats, stats.objects = suspend['state'][name]
- stats.update(total_stats)
- if suspend['timestamp'] < int(time.time()):
- self.commit(suspend['timestamp'])
- self.commit_objects()
- os.unlink(self._suspend_path)
-
- def __getitem__(self, name):
- return self._rrd[name]
-
- def suspend(self):
- state = dict([(i, (j, j.objects)) for i, j in self._stats.items()])
- with file(self._suspend_path, 'w') as f:
- json.dump({
- 'timestamp': self._last + stats_node_step.value,
- 'state': state,
- }, f)
-
- def log(self, request):
- if request.cmd or request.resource not in _STATS:
- return
- self._stats[request.resource].log(request)
-
- def commit(self, timestamp=None, extra_values=None):
- _logger.trace('Commit node stats')
-
- for resource, stats in self._stats.items():
- if resource not in self._rrd:
- continue
- values = stats.copy()
- if extra_values and resource in extra_values:
- values.update(extra_values[resource])
- if values:
- self._rrd[resource].put(values, timestamp=timestamp)
-
- self._last = timestamp or int(time.time())
-
- def commit_objects(self, reset=False):
- _logger.trace('Commit object stats')
-
- for resource, stats in self._stats.items():
- old = {
- 'downloads': 0,
- 'rating': (0, 0),
- }
- directory = self._volume[resource]
- for guid, new in stats.objects.items():
- if not directory.exists(guid):
- _logger.warning('Ignore stats for missed %r %s',
- guid, resource)
- continue
- if not reset:
- old = directory.get(guid)
- patch = {}
- if 'downloads' in new:
- patch['downloads'] = new['downloads'] + old['downloads']
- if 'votes' in new:
- votes, rating = old['rating']
- votes += new['votes']
- rating += new['rating']
- patch['rating'] = [votes, rating]
- directory.update(guid, patch)
- stats.objects.clear()
-
- def report(self, dbs, start, end, records):
- result = {}
-
- rdbs = [self._rrd[i] for i in dbs if i in self._rrd]
- if not rdbs:
- return result
-
- if not start:
- start = min([i.first for i in rdbs]) or 0
- if not end:
- end = max([i.last for i in rdbs]) or 0
- resolution = max(1, (end - start) / records)
-
- _logger.debug('Report start=%s end=%s resolution=%s dbs=%r',
- start, end, resolution, dbs)
-
- for rdb in rdbs:
- info = result[rdb.name] = []
- for ts, ds_values in rdb.get(start, end, resolution):
- values = {}
- for name in dbs[rdb.name]:
- values[name] = ds_values.get(name)
- info.append((ts, values))
-
- return result
-
-
-class _Stats(dict):
-
- RESOURCE = None
- PARENTS = []
-
- def __init__(self, stats, volume):
- self.objects = {}
- self._stats = stats
- self._volume = volume
-
- def inc(self, guid, prop, value=1):
- obj = self.objects.setdefault(guid, {})
- if prop not in obj:
- obj[prop] = value
- else:
- obj[prop] += value
-
- def log(self, request):
- pass
-
-
-class _ResourceStats(_Stats):
-
- def __init__(self, stats, volume):
- _Stats.__init__(self, stats, volume)
- self['total'] = 0
-
- def log(self, request):
- if request.method == 'POST':
- self['total'] += 1
- elif request.method == 'DELETE':
- self['total'] -= 1
-
- def parse_context(self, request):
- context = None
- directory = self._volume[self.RESOURCE]
-
- def parse_context(props):
- for prop, resource in self.PARENTS:
- guid = props.get(prop)
- if not guid:
- continue
- if resource == 'context':
- return guid
- else:
- return self._volume[resource].get(guid)['context']
-
- if request.method == 'GET':
- if not request.guid:
- context = parse_context(request)
- elif self.RESOURCE == 'context':
- context = request.guid
- elif self.RESOURCE != 'user':
- context = directory.get(request.guid)['context']
- elif request.method == 'PUT':
- if self.RESOURCE == 'context':
- context = request.guid
- else:
- context = request.content.get('context')
- if not context:
- context = directory.get(request.guid)['context']
- elif request.method == 'POST':
- context = parse_context(request.content)
-
- return context
-
-
-class _UserStats(_ResourceStats):
-
- RESOURCE = 'user'
-
-
-class _ContextStats(_ResourceStats):
-
- RESOURCE = 'context'
-
- def __init__(self, stats, volume):
- _ResourceStats.__init__(self, stats, volume)
- self['released'] = 0
- self['failed'] = 0
- self['downloaded'] = 0
-
-
-class _ReleaseStats(_Stats):
-
- RESOURCE = 'release'
- PARENTS = [('context', 'context')]
-
- def log(self, request):
- if request.method == 'GET':
- if request.prop == 'data':
- context = self._volume[self.RESOURCE].get(request.guid)
- self._stats['context'].inc(context.context, 'downloads')
- self._stats['context']['downloaded'] += 1
- elif request.method == 'POST':
- self._stats['context']['released'] += 1
-
-
-class _ReportStats(_Stats):
-
- RESOURCE = 'report'
- PARENTS = [('context', 'context'), ('release', 'release')]
-
- def log(self, request):
- if request.method == 'POST':
- self._stats['context']['failed'] += 1
-
-
-class _PostStats(_ResourceStats):
-
- RESOURCE = 'post'
- PARENTS = [('context', 'context'), ('topic', 'post')]
-
- def __init__(self, stats, volume):
- _ResourceStats.__init__(self, stats, volume)
- self['downloaded'] = 0
-
- def log(self, request):
- _ResourceStats.log(self, request)
-
- if request.method == 'POST':
- stats = None
- if request.content['type'] == 'review':
- stats = self._stats['context']
- guid = request.content['context']
- elif request.content['type'] == 'feedback':
- stats = self._stats['post']
- guid = request.content['topic']
- if stats:
- stats.inc(guid, 'votes')
- stats.inc(guid, 'rating', request.content.get('vote') or 0)
-
- elif request.method == 'GET' and request.prop == 'data':
- self.inc(request.guid, 'downloads')
- self['downloaded'] += 1
-
-
-_STATS = {_UserStats.RESOURCE: _UserStats,
- _ContextStats.RESOURCE: _ContextStats,
- _ReleaseStats.RESOURCE: _ReleaseStats,
- _ReportStats.RESOURCE: _ReportStats,
- _PostStats.RESOURCE: _PostStats,
- }
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
index b0a20bf..f5b946c 100644
--- a/sugar_network/node/sync.py
+++ b/sugar_network/node/sync.py
@@ -199,7 +199,7 @@ def _encode(limit, packets, download_blobs, header, status):
pos = (yield chunk) or 0
blob_size -= len(chunk)
enforce(blob_size == 0, EOFError,
- 'Blob size is not the same as declared')
+ 'File size is not the same as declared')
record = next(content)
except StopIteration:
diff --git a/sugar_network/node/volume.py b/sugar_network/node/volume.py
deleted file mode 100644
index 0c254f7..0000000
--- a/sugar_network/node/volume.py
+++ /dev/null
@@ -1,142 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import logging
-
-from sugar_network import toolkit
-from sugar_network.toolkit.router import Request
-from sugar_network.toolkit import http, coroutine, enforce
-
-
-# Apply node level layer for these documents
-_LIMITED_RESOURCES = ('context', 'release')
-
-_logger = logging.getLogger('node.volume')
-
-
-def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None,
- fetch_blobs=False, ignore_documents=None, **kwargs):
- connection = http.Connection()
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_the_only_seq = not out_seq
- if layer:
- if isinstance(layer, basestring):
- layer = [layer]
- layer.append('common')
- try:
- for resource, directory in volume.items():
- if ignore_documents and resource in ignore_documents:
- continue
- coroutine.dispatch()
- directory.commit()
- yield {'resource': resource}
- for guid, patch in directory.diff(in_seq, exclude_seq,
- layer=layer if resource in _LIMITED_RESOURCES else None):
- adiff = {}
- adiff_seq = toolkit.Sequence()
- for prop, meta, seqno in patch:
- if 'blob' in meta:
- blob_path = meta.pop('blob')
- yield {'guid': guid,
- 'diff': {prop: meta},
- 'blob_size': meta['blob_size'],
- 'blob': toolkit.iter_file(blob_path),
- }
- elif fetch_blobs and 'url' in meta:
- url = meta.pop('url')
- try:
- blob = connection.request('GET', url,
- allow_redirects=True,
- # We need uncompressed size
- headers={'Accept-Encoding': ''})
- except Exception:
- _logger.exception('Cannot fetch %r for %s:%s:%s',
- url, resource, guid, prop)
- is_the_only_seq = False
- continue
- yield {'guid': guid,
- 'diff': {prop: meta},
- 'blob_size':
- int(blob.headers['Content-Length']),
- 'blob': blob.iter_content(toolkit.BUFFER_SIZE),
- }
- else:
- adiff[prop] = meta
- adiff_seq.include(seqno, seqno)
- if adiff:
- yield {'guid': guid, 'diff': adiff}
- out_seq.include(adiff_seq)
- if is_the_only_seq:
- # There is only one diff, so, we can stretch it to remove all holes
- out_seq.stretch()
- except StopIteration:
- pass
-
- yield {'commit': out_seq}
-
-
-def merge(volume, records, shift_seqno=True, stats=None):
- resource = None
- directory = None
- commit_seq = toolkit.Sequence()
- merged_seq = toolkit.Sequence()
- synced = False
-
- for record in records:
- resource_ = record.get('resource')
- if resource_:
- resource = resource_
- directory = volume[resource_]
- continue
-
- if 'guid' in record:
- guid = record['guid']
- layer = []
- existed = directory.exists(guid)
- if existed:
- layer = directory.get(guid)['layer']
-
- def update_stats(upd):
- method = 'PUT' if existed else 'POST'
- if ('deleted' in layer) != ('deleted' in upd.get('layer', [])):
- if 'deleted' in layer:
- # TODO
- enforce(not 'supported yet')
- else:
- method = 'DELETE'
- stats.log(Request(
- method=method,
- path=[resource, guid],
- content=upd,
- ))
-
- if stats is not None:
- record['op'] = update_stats
- seqno, merged = directory.merge(shift_seqno=shift_seqno, **record)
- synced = synced or merged
- if seqno is not None:
- merged_seq.include(seqno, seqno)
- continue
-
- commit = record.get('commit')
- if commit is not None:
- commit_seq.include(commit)
- continue
-
- if synced:
- volume.broadcast({'event': 'sync'})
-
- return commit_seq, merged_seq