diff options
Diffstat (limited to 'sugar_network/client/routes.py')
-rw-r--r-- | sugar_network/client/routes.py | 381 |
1 files changed, 97 insertions, 284 deletions
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index 942b052..dfbda6f 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -13,38 +13,38 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +# pylint: disable=W0611 + import os import logging import httplib from os.path import join from sugar_network import db, client, node, toolkit, model -from sugar_network.client import journal, clones, injector +from sugar_network.client import journal, implementations from sugar_network.node.slave import SlaveRoutes from sugar_network.toolkit import netlink, mountpoints from sugar_network.toolkit.router import ACL, Request, Response, Router -from sugar_network.toolkit.router import route, fallbackroute -from sugar_network.toolkit.spec import Spec +from sugar_network.toolkit.router import route, fallbackroute, postroute from sugar_network.toolkit import zeroconf, coroutine, http, enforce # Top-level directory name to keep SN data on mounted devices _SN_DIRNAME = 'sugar-network' -_LOCAL_PROPS = frozenset(['favorite', 'clone']) - # Flag file to recognize a directory as a synchronization directory _SYNC_DIRNAME = 'sugar-network-sync' - _RECONNECT_TIMEOUT = 3 _RECONNECT_TIMEOUT_MAX = 60 * 15 +_LOCAL_LAYERS = frozenset(['local', 'clone', 'favorite']) _logger = logging.getLogger('client.routes') -class ClientRoutes(model.Routes, journal.Routes): +class ClientRoutes(model.FrontRoutes, implementations.Routes, journal.Routes): def __init__(self, home_volume, api_url=None, no_subscription=False): - model.Routes.__init__(self) + model.FrontRoutes.__init__(self) + implementations.Routes.__init__(self, home_volume) if not client.no_dbus.value: journal.Routes.__init__(self) @@ -57,7 +57,7 @@ class ClientRoutes(model.Routes, journal.Routes): self._no_subscription = no_subscription self._server_mode = not api_url - home_volume.broadcast = self.broadcast + self._got_offline() if self._server_mode: mountpoints.connect(_SN_DIRNAME, @@ -74,6 +74,23 @@ class ClientRoutes(model.Routes, journal.Routes): self._got_offline() self._local.volume.close() + @postroute + def postroute(self, request, response, result, error): + if error is None or isinstance(error, http.StatusPass): + return + event = {'event': 'failure', + 'exception': type(error).__name__, + 'error': str(error), + 'method': request.method, + 'cmd': request.cmd, + 'resource': request.resource, + 'guid': request.guid, + 'prop': request.prop, + } + event.update(request) + event.update(request.session) + self.broadcast(event) + @fallbackroute('GET', ['hub']) def hub(self, request, response): """Serve Hub via HTTP instead of file:// for IPC users. @@ -104,7 +121,7 @@ class ClientRoutes(model.Routes, journal.Routes): @fallbackroute('GET', ['packages']) def route_packages(self, request, response): if self._inline.is_set(): - return self._node_call(request, response) + return self.fallback(request, response) else: # Let caller know that we are in offline and # no way to process specified request on the node @@ -127,158 +144,60 @@ class ClientRoutes(model.Routes, journal.Routes): def whoami(self, request, response): if self._inline.is_set(): - return self._node_call(request, response) + return self.fallback(request, response) else: return {'roles': [], 'guid': client.sugar_uid()} @route('GET', [None], - arguments={'reply': ('guid',), 'clone': int, 'favorite': bool}, + arguments={ + 'offset': int, + 'limit': int, + 'reply': ('guid',), + 'layer': list, + }, mime_type='application/json') - def find(self, request, response, clone, favorite): - if not self._inline.is_set() or clone or favorite: + def find(self, request, response, layer): + if set(request.get('layer', [])) & set(['favorite', 'clone']): return self._local.call(request, response) - else: - return self._proxy_get(request, response) - @route('GET', [None, None], - arguments={'reply': list}, mime_type='application/json') + reply = request.setdefault('reply', ['guid']) + if 'layer' not in reply: + return self.fallback(request, response) + + if 'guid' not in reply: + # Otherwise there is no way to mixin local `layer` + reply.append('guid') + result = self.fallback(request, response) + + directory = self._local.volume[request.resource] + for item in result['result']: + if directory.exists(item['guid']): + existing_layer = directory.get(item['guid'])['layer'] + item['layer'][:] = set(item['layer']) | set(existing_layer) + + return result + + @route('GET', [None, None], mime_type='application/json') def get(self, request, response): - return self._proxy_get(request, response) + if self._local.volume[request.resource].exists(request.guid): + return self._local.call(request, response) + else: + return self.fallback(request, response) @route('GET', [None, None, None], mime_type='application/json') def get_prop(self, request, response): - return self._proxy_get(request, response) - - @route('GET', ['context', None], cmd='make') - def make(self, request): - for event in injector.make(request.guid): - event['event'] = 'make' - self.broadcast(event) - - @route('GET', ['context', None], cmd='launch', - arguments={'args': list}) - def launch(self, request, args, activity_id=None, - object_id=None, uri=None, color=None, no_spawn=None): - - def do_launch(): - for event in injector.launch(request.guid, args, - activity_id=activity_id, object_id=object_id, uri=uri, - color=color): - event['event'] = 'launch' - self.broadcast(event) - - if no_spawn: - do_launch() - else: - self._jobs.spawn(do_launch) - - @route('PUT', ['context', None], cmd='clone', - arguments={'force': False, 'nodeps': False, 'requires': list}) - def clone_context(self, request): - enforce(self._inline.is_set(), 'Not available in offline') - - context_type = self._node_call(method='GET', - path=['context', request.guid, 'type']) - if 'stability' not in request: - request['stability'] = client.stability(request.guid) - - if 'activity' in context_type: - self._clone_activity(request) - elif 'content' in context_type: - - def get_props(): - impls = self._node_call(method='GET', - path=['implementation'], context=request.guid, - stability=request['stability'], order_by='-version', - limit=1, reply=['guid'])['result'] - enforce(impls, http.NotFound, 'No implementations') - impl_id = impls[0]['guid'] - props = self._node_call(method='GET', - path=['context', request.guid], - reply=['title', 'description']) - props['preview'] = self._node_call(method='GET', - path=['context', request.guid, 'preview']) - data_response = Response() - props['data'] = self._node_call(response=data_response, - method='GET', - path=['implementation', impl_id, 'data']) - props['mime_type'] = data_response.content_type or \ - 'application/octet' - props['activity_id'] = impl_id - return props - - self._clone_jobject(request, get_props) + if self._local.volume[request.resource].exists(request.guid): + return self._local.call(request, response) else: - raise RuntimeError('No way to clone') - - @route('PUT', ['artifact', None], cmd='clone', arguments={'force': False}) - def clone_artifact(self, request): - enforce(self._inline.is_set(), 'Not available in offline') - - def get_props(): - props = self._node_call(method='GET', - path=['artifact', request.guid], - reply=['title', 'description', 'context']) - props['preview'] = self._node_call(method='GET', - path=['artifact', request.guid, 'preview']) - props['data'] = self._node_call(method='GET', - path=['artifact', request.guid, 'data']) - props['activity'] = props.pop('context') - return props - - self._clone_jobject(request, get_props) - - @route('PUT', ['context', None], cmd='favorite') - def favorite(self, request): - if request.content or \ - self._local.volume['context'].exists(request.guid): - self._checkin_context(request.guid, {'favorite': request.content}) - - @route('GET', ['context', None], cmd='feed', - mime_type='application/json') - def feed(self, request, response): - try: - context = self._local.volume['context'].get(request.guid) - except http.NotFound: - context = None - if context is None or context['clone'] != 2: - if self._inline.is_set(): - return self._node_call(request, response) - else: - # Let caller know that we are in offline and - # no way to process specified request on the node - raise http.ServiceUnavailable() - - versions = [] - for path in clones.walk(context.guid): - try: - spec = Spec(root=path) - except Exception: - toolkit.exception(_logger, 'Failed to read %r spec file', path) - continue - versions.append({ - 'guid': spec.root, - 'version': spec['version'], - 'arch': '*-*', - 'stability': 'stable', - 'commands': { - 'activity': { - 'exec': spec['Activity', 'exec'], - }, - }, - 'requires': spec.requires, - }) - - return {'name': context.get('title', - accept_language=request.accept_language), - 'implementations': versions, - } + return self.fallback(request, response) @fallbackroute() - def _node_call(self, request=None, response=None, method=None, path=None, - **kwargs): + def fallback(self, request=None, response=None, method=None, path=None, + cmd=None, **kwargs): if request is None: - request = Request(method=method, path=path) + request = Request(method=method, path=path, cmd=cmd) + if response is None: + response = Response() request.update(kwargs) if self._inline.is_set(): if client.layers.value and \ @@ -301,6 +220,7 @@ class ClientRoutes(model.Routes, journal.Routes): _logger.debug('Got online on %r', self._node) self._inline.set() self.broadcast({'event': 'inline', 'state': 'online'}) + self._local.volume.broadcast = None def _got_offline(self): if self._inline.is_set(): @@ -308,10 +228,12 @@ class ClientRoutes(model.Routes, journal.Routes): self._node.close() self._inline.clear() self.broadcast({'event': 'inline', 'state': 'offline'}) + self._local.volume.broadcast = self.broadcast def _fall_offline(self): - _logger.debug('Fall to offline on %r', self._node) - self._inline_job.kill() + if self._inline_job: + _logger.debug('Fall to offline on %r', self._node) + self._inline_job.kill() def _restart_online(self): self._fall_offline() @@ -338,7 +260,7 @@ class ClientRoutes(model.Routes, journal.Routes): if event.get('resource') == 'implementation': mtime = event.get('mtime') if mtime: - injector.invalidate_solutions(mtime) + self.invalidate_solutions(mtime) self.broadcast(event) def handshake(url): @@ -347,7 +269,7 @@ class ClientRoutes(model.Routes, journal.Routes): info = self._node.get(cmd='info') impl_info = info['documents'].get('implementation') if impl_info: - injector.invalidate_solutions(impl_info['mtime']) + self.invalidate_solutions(impl_info['mtime']) if self._inline.is_set(): _logger.info('Reconnected to %r node', url) else: @@ -415,127 +337,14 @@ class ClientRoutes(model.Routes, journal.Routes): self._inline_job.kill() self._got_offline() - def _checkin_context(self, guid, props): - contexts = self._local.volume['context'] - - if contexts.exists(guid): - contexts.update(guid, props) - else: - copy = self._node_call(method='GET', path=['context', guid], - reply=[ - 'type', 'title', 'summary', 'description', - 'homepage', 'mime_types', 'dependencies', - ]) - copy.update(props) - copy['guid'] = guid - contexts.create(copy) - for prop in ('icon', 'artifact_icon', 'preview'): - blob = self._node_call(method='GET', - path=['context', guid, prop]) - if blob is not None: - contexts.update(guid, {prop: {'blob': blob}}) - - def _proxy_get(self, request, response): - resource = request.resource - if resource not in ('context', 'artifact'): - return self._node_call(request, response) - - if not self._inline.is_set(): - return self._local.call(request, response) - - request_guid = request.guid if len(request.path) > 1 else None - if request_guid and self._local.volume[resource].exists(request_guid): - return self._local.call(request, response) - - if request.prop is not None: - mixin = None - else: - reply = request.setdefault('reply', ['guid']) - mixin = set(reply) & _LOCAL_PROPS - if mixin: - # Otherwise there is no way to mixin _LOCAL_PROPS - if not request_guid and 'guid' not in reply: - reply.append('guid') - if resource == 'context' and 'type' not in reply: - reply.append('type') - - result = self._node_call(request, response) - if not mixin: - return result - - if request_guid: - items = [result] - else: - items = result['result'] - - def mixin_jobject(props, guid): - if 'clone' in mixin: - props['clone'] = 2 if journal.exists(guid) else 0 - if 'favorite' in mixin: - props['favorite'] = bool(int(journal.get(guid, 'keep') or 0)) - - if resource == 'context': - contexts = self._local.volume['context'] - for props in items: - guid = request_guid or props['guid'] - if 'activity' in props['type']: - if contexts.exists(guid): - patch = contexts.get(guid).properties(mixin) - else: - patch = dict([(i, contexts.metadata[i].default) - for i in mixin]) - props.update(patch) - elif 'content' in props['type']: - mixin_jobject(props, guid) - elif resource == 'artifact': - for props in items: - mixin_jobject(props, request_guid or props['guid']) - - return result - - def _clone_activity(self, request): - if not request.content: - clones.wipeout(request.guid) - return - for __ in clones.walk(request.guid): - if not request.get('force'): - return - break - self._checkin_context(request.guid, {'clone': 1}) - if request.get('nodeps'): - pipe = injector.clone_impl(request.guid, - stability=request['stability'], - requires=request.get('requires')) - else: - pipe = injector.clone(request.guid) - event = {} - for event in pipe: - event['event'] = 'clone' - self.broadcast(event) - if event.get('state') == 'failure': - self._checkin_context(request.guid, {'clone': 0}) - raise RuntimeError(event['error']) - - def _clone_jobject(self, request, get_props): - if request.content: - if request['force'] or not journal.exists(request.guid): - self.journal_update(request.guid, **get_props()) - self.broadcast({ - 'event': 'show_journal', - 'uid': request.guid, - }) - else: - if journal.exists(request.guid): - self.journal_delete(request.guid) - class CachedClientRoutes(ClientRoutes): def __init__(self, home_volume, api_url=None, no_subscription=False): - ClientRoutes.__init__(self, home_volume, api_url, no_subscription) self._push_seq = toolkit.PersistentSequence( join(home_volume.root, 'push.sequence'), [1, None]) self._push_job = coroutine.Pool() + ClientRoutes.__init__(self, home_volume, api_url, no_subscription) def _got_online(self): ClientRoutes._got_online(self) @@ -548,6 +357,7 @@ class CachedClientRoutes(ClientRoutes): def _push(self): pushed_seq = toolkit.Sequence() skiped_seq = toolkit.Sequence() + volume = self._local.volume def push(request, seq): try: @@ -559,44 +369,45 @@ class CachedClientRoutes(ClientRoutes): else: pushed_seq.include(seq) - for document, directory in self._local.volume.items(): - if directory.mtime <= self._push_seq.mtime: + for res in volume.resources: + if volume.mtime(res) <= self._push_seq.mtime: continue - _logger.debug('Check %r local cache to push', document) + _logger.debug('Check %r local cache to push', res) - for guid, patch in directory.diff(self._push_seq, layer='local'): + for guid, patch in volume[res].diff(self._push_seq, layer='local'): diff = {} diff_seq = toolkit.Sequence() post_requests = [] for prop, meta, seqno in patch: if 'blob' in meta: - request = Request(method='PUT', - path=[document, guid, prop]) + request = Request(method='PUT', path=[res, guid, prop]) request.content_type = meta['mime_type'] request.content_length = os.stat(meta['blob']).st_size request.content_stream = \ toolkit.iter_file(meta['blob']) post_requests.append((request, seqno)) elif 'url' in meta: - request = Request(method='PUT', - path=[document, guid, prop]) + request = Request(method='PUT', path=[res, guid, prop]) request.content_type = 'application/json' request.content = meta post_requests.append((request, seqno)) else: - diff[prop] = meta['value'] + value = meta['value'] + if prop == 'layer': + value = list(set(value) - _LOCAL_LAYERS) + diff[prop] = value diff_seq.include(seqno, seqno) if not diff: continue if 'guid' in diff: - request = Request(method='POST', path=[document]) + request = Request(method='POST', path=[res]) access = ACL.CREATE | ACL.WRITE else: - request = Request(method='PUT', path=[document, guid]) + request = Request(method='PUT', path=[res, guid]) access = ACL.WRITE for name in diff.keys(): - if not (directory.metadata[name].acl & access): + if not (volume[res].metadata[name].acl & access): del diff[name] request.content_type = 'application/json' request.content = diff @@ -613,23 +424,25 @@ class CachedClientRoutes(ClientRoutes): self._push_seq.exclude(pushed_seq) if not skiped_seq: self._push_seq.stretch() - # No any decent reasons to keep fail reports after uploding. - # TODO The entire offlile synchronization should be improved, - # for now, it is possible to have a race here - self._local.volume['report'].wipe() + if 'report' in volume: + # No any decent reasons to keep fail reports after uploding. + # TODO The entire offlile synchronization should be improved, + # for now, it is possible to have a race here + volume['report'].wipe() + self._push_seq.commit() self.broadcast({'event': 'push'}) -class _LocalRoutes(db.Routes, Router): +class _LocalRoutes(model.VolumeRoutes, Router): def __init__(self, volume): - db.Routes.__init__(self, volume) + model.VolumeRoutes.__init__(self, volume) Router.__init__(self, self) def on_create(self, request, props, event): props['layer'] = tuple(props['layer']) + ('local',) - db.Routes.on_create(self, request, props, event) + model.VolumeRoutes.on_create(self, request, props, event) class _NodeRoutes(SlaveRoutes, Router): |