# Copyright (C) 2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import json
import base64
import logging
from Cookie import SimpleCookie
from os.path import join
from sugar_network import node, toolkit
from sugar_network.node import sync, stats_user, files, volume, downloads, obs
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
from sugar_network.toolkit import http, coroutine, enforce
_ONE_WAY_DOCUMENTS = ['report']
_logger = logging.getLogger('node.master')
class MasterRoutes(NodeRoutes):
def __init__(self, guid, volume_):
NodeRoutes.__init__(self, guid, volume_)
self._pulls = {
'pull': lambda **kwargs:
('diff', None, volume.diff(self.volume,
ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)),
'files_pull': lambda **kwargs:
('files_diff', None, self._files.diff(**kwargs)),
}
self._pull_queue = downloads.Pool(
join(toolkit.cachedir.value, 'pulls'))
self._files = None
if node.files_root.value:
self._files = files.Index(node.files_root.value,
join(volume_.root, 'files.index'), volume_.seqno)
@route('POST', cmd='sync',
acl=ACL.AUTH)
def sync(self, request):
reply, cookie = self._push(sync.decode(request.content_stream))
exclude_seq = None
if len(cookie.sent) == 1:
exclude_seq = cookie.sent.values()[0]
for op, layer, seq in cookie:
reply.append(self._pulls[op](in_seq=seq,
exclude_seq=exclude_seq, layer=layer))
return sync.encode(reply, src=self.guid)
@route('POST', cmd='push')
def push(self, request, response):
reply, cookie = self._push(sync.package_decode(request.content_stream))
# Read passed cookie only after excluding `merged_seq`.
# If there is `pull` out of currently pushed packet, excluding
# `merged_seq` should not affect it.
cookie.update(_Cookie(request))
cookie.store(response)
return sync.package_encode(reply, src=self.guid)
@route('GET', cmd='pull',
mime_type='application/octet-stream',
arguments={'accept_length': int})
def pull(self, request, response, accept_length=None):
cookie = _Cookie(request)
if not cookie:
_logger.warning('Requested full dump in pull command')
cookie.append(('pull', None, toolkit.Sequence([[1, None]])))
cookie.append(('files_pull', None, toolkit.Sequence([[1, None]])))
exclude_seq = None
if len(cookie.sent) == 1:
exclude_seq = toolkit.Sequence(cookie.sent.values()[0])
reply = None
for pull_key in cookie:
op, layer, seq = pull_key
pull = self._pull_queue.get(pull_key)
if pull is not None:
if not pull.ready:
continue
if not pull.tag:
self._pull_queue.remove(pull_key)
cookie.remove(pull_key)
continue
if accept_length is None or pull.length <= accept_length:
_logger.debug('Found ready to use %r', pull)
if pull.complete:
cookie.remove(pull_key)
else:
seq.exclude(pull.tag)
reply = pull.open()
break
_logger.debug('Existing %r is too big, will recreate', pull)
self._pull_queue.remove(pull_key)
out_seq = toolkit.Sequence()
pull = self._pull_queue.set(pull_key, out_seq,
sync.sneakernet_encode,
[self._pulls[op](in_seq=seq, out_seq=out_seq,
exclude_seq=exclude_seq, layer=layer,
fetch_blobs=True)],
limit=accept_length, src=self.guid)
_logger.debug('Start new %r', pull)
if reply is None:
if cookie:
_logger.debug('No ready pulls')
# TODO Might be useful to set meaningful value here
cookie.delay = node.pull_timeout.value
else:
_logger.debug('Nothing to pull')
cookie.store(response)
return reply
@route('PUT', ['context', None], cmd='presolve',
acl=ACL.AUTH, mime_type='application/json')
def presolve(self, request):
enforce(node.files_root.value, http.BadRequest, 'Disabled')
aliases = self.volume['context'].get(request.guid)['aliases']
enforce(aliases, http.BadRequest, 'Nothing to presolve')
return obs.presolve(aliases, node.files_root.value)
def status(self):
result = NodeRoutes.status(self)
result['level'] = 'master'
return result
def after_post(self, doc):
if doc.metadata.name == 'context':
shift_implementations = doc.modified('dependencies')
if doc.modified('aliases'):
# TODO Already launched job should be killed
coroutine.spawn(self._resolve_aliases, doc)
shift_implementations = True
if shift_implementations and not doc.is_new:
# Shift checkpoint to invalidate solutions
self.volume['implementation'].checkpoint()
NodeRoutes.after_post(self, doc)
def _push(self, stream):
reply = []
cookie = _Cookie()
for packet in stream:
src = packet['src']
enforce(packet['dst'] == self.guid, 'Misaddressed packet')
if packet.name == 'pull':
pull_seq = cookie['pull', packet['layer'] or None]
pull_seq.include(packet['sequence'])
cookie.sent.setdefault(src, toolkit.Sequence())
elif packet.name == 'files_pull':
if self._files is not None:
cookie['files_pull'].include(packet['sequence'])
elif packet.name == 'diff':
seq, ack_seq = volume.merge(self.volume, packet,
stats=self._stats)
reply.append(('ack', {
'ack': ack_seq,
'sequence': seq,
'dst': src,
}, None))
sent_seq = cookie.sent.setdefault(src, toolkit.Sequence())
sent_seq.include(ack_seq)
elif packet.name == 'stats_diff':
reply.append(('stats_ack', {
'sequence': stats_user.merge(packet),
'dst': src,
}, None))
return reply, cookie
def _resolve_aliases(self, doc):
packages = {}
for repo in obs.get_repos():
alias = doc['aliases'].get(repo['distributor_id'])
if not alias:
continue
package = packages[repo['name']] = {}
for kind in ('binary', 'devel'):
obs_fails = []
for to_resolve in alias.get(kind) or []:
if not to_resolve:
continue
try:
for arch in repo['arches']:
obs.resolve(repo['name'], arch, to_resolve)
except Exception, error:
_logger.warning('Failed to resolve %r on %s',
to_resolve, repo['name'])
obs_fails.append(str(error))
continue
package[kind] = to_resolve
break
else:
package['status'] = '; '.join(obs_fails)
break
else:
if 'binary' in package:
package['status'] = 'success'
else:
package['status'] = 'no packages to resolve'
if packages != doc['packages']:
self.volume['context'].update(doc.guid, {'packages': packages})
if node.files_root.value:
obs.presolve(doc['aliases'], node.files_root.value)
class _Cookie(list):
def __init__(self, request=None):
list.__init__(self)
self.sent = {}
self.delay = 0
if request is not None:
self.update(self._get_cookie(request, 'sugar_network_pull') or [])
self.sent = self._get_cookie(request, 'sugar_network_sent') or {}
def __repr__(self):
return '' % (list.__repr__(self), self.sent)
def update(self, that):
for op, layer, seq in that:
self[op, layer].include(seq)
def store(self, response):
response.set('set-cookie', [])
if self:
_logger.debug('Postpone %r in cookie', self)
self._set_cookie(response, 'sugar_network_pull',
base64.b64encode(json.dumps(self)))
self._set_cookie(response, 'sugar_network_sent',
base64.b64encode(json.dumps(self.sent)))
self._set_cookie(response, 'sugar_network_delay', self.delay)
else:
self._unset_cookie(response, 'sugar_network_pull')
self._unset_cookie(response, 'sugar_network_sent')
self._unset_cookie(response, 'sugar_network_delay')
def __getitem__(self, key):
if not isinstance(key, tuple):
key = (key, None)
for op, layer, seq in self:
if (op, layer) == key:
return seq
seq = toolkit.Sequence()
self.append(key + (seq,))
return seq
def _get_cookie(self, request, name):
cookie_str = request.environ.get('HTTP_COOKIE')
if not cookie_str:
return
cookie = SimpleCookie()
cookie.load(cookie_str)
if name not in cookie:
return
value = cookie.get(name).value
if value != 'unset_%s' % name:
return json.loads(base64.b64decode(value))
def _set_cookie(self, response, name, value, age=3600):
cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age)
response.get('set-cookie').append(cookie)
def _unset_cookie(self, response, name):
self._set_cookie(response, name, 'unset_%s' % name, 0)