diff options
Diffstat (limited to 'sugar_network/client/implementations.py')
-rw-r--r-- | sugar_network/client/implementations.py | 221 |
1 files changed, 114 insertions, 107 deletions
diff --git a/sugar_network/client/implementations.py b/sugar_network/client/implementations.py index 5ab2e5b..841fb5f 100644 --- a/sugar_network/client/implementations.py +++ b/sugar_network/client/implementations.py @@ -62,7 +62,7 @@ class Routes(object): @route('GET', ['context', None], cmd='launch', arguments={'args': list}, mime_type='text/event-stream') - def launch(self, request, no_spawn): + def launch(self, request): activity_id = request.get('activity_id') if 'object_id' in request and not activity_id: activity_id = journal.get(request['object_id'], 'activity_id') @@ -73,22 +73,28 @@ class Routes(object): for context in self._checkin_context(request): yield {'event': 'launch', 'activity_id': activity_id}, request - impl = self._solve_impl(context, request) - if 'activity' not in context['type']: - app = request.get('context') or \ - _mimetype_context(impl['mime_type']) - enforce(app, 'Cannot find proper application') - self._checkin_impl(context, request, impl) - request = Request(path=['context', app], - object_id=impl['path'], session=request.session) - for context in self._checkin_context(request): - impl = self._solve_impl(context, request) - self._checkin_impl(context, request, impl) - - child = _exec(context, request, impl) - yield {'event': 'exec', 'activity_id': activity_id} - - status = child.wait() + acquired = [] + try: + impl = self._solve_impl(context, request) + if 'activity' not in context['type']: + app = request.get('context') or \ + _mimetype_context(impl['data']['mime_type']) + enforce(app, 'Cannot find proper application') + acquired += self._checkin_impl( + context, request, self._cache.acquire) + request = Request(path=['context', app], + object_id=impl['path'], session=request.session) + for context in self._checkin_context(request): + impl = self._solve_impl(context, request) + acquired += self._checkin_impl( + context, request, self._cache.acquire) + + child = _exec(context, request, impl) + yield {'event': 'exec', 'activity_id': activity_id} + status = child.wait() + finally: + self._cache.release(*acquired) + _logger.debug('Exit %s[%s]: %r', context.guid, child.pid, status) enforce(status == 0, 'Process exited with %r status', status) yield {'event': 'exit', 'activity_id': activity_id} @@ -102,14 +108,16 @@ class Routes(object): cloned_path = context.path('.clone') if request.content: impl = self._solve_impl(context, request) - self._checkin_impl(context, request, impl) + self._checkin_impl(context, request, self._cache.checkout) impl_path = relpath(dirname(impl['path']), context.path()) os.symlink(impl_path, cloned_path) - self._cache.checkout(impl['guid']) yield {'event': 'ready'} else: cloned_impl = basename(os.readlink(cloned_path)) - self._cache.checkin(cloned_impl) + meta = self._volume['implementation'].get( + cloned_impl).meta('data') + size = meta.get('unpack_size') or meta['blob_size'] + self._cache.checkin(cloned_impl, size) os.unlink(cloned_path) @route('GET', ['context', None], cmd='clone', @@ -140,11 +148,10 @@ class Routes(object): raise http.ServiceUnavailable, error, sys.exc_info()[2] def _checkin_context(self, request, layer=None): + contexts = self._volume['context'] guid = request.guid - if layer and not request.content and \ - not self._volume['context'].exists(guid): + if layer and not request.content and not contexts.exists(guid): return - contexts = self._volume['context'] if not contexts.exists(guid): context = self._call(method='GET', path=['context', guid]) @@ -184,47 +191,95 @@ class Routes(object): _logger.debug('Solving %r stability=%r', request.guid, stability) - if 'activity' not in context['type']: - _logger.debug('Cloniing %r', request.guid) - response = Response() - blob = self._call(method='GET', path=['context', request.guid], - cmd='clone', stability=stability, response=response) - response.meta['data']['blob'] = blob - return response.meta - solution, stale = self._cache_solution_get(request.guid, stability) if stale is False: _logger.debug('Reuse cached %r solution', request.guid) elif solution is not None and not self.inline(): - _logger.debug('Reuse stale %r solution in offline', request.guid) - else: - _logger.debug('Solve %r', request.guid) + _logger.debug('Reuse stale %r in offline', request.guid) + elif 'activity' in context['type']: from sugar_network.client import solver solution = self._map_exceptions(solver.solve, self.fallback, request.guid, stability) + else: + response = Response() + blob = self._call(method='GET', path=['context', request.guid], + cmd='clone', stability=stability, response=response) + response.meta['data']['blob'] = blob + solution = [response.meta] + request.session['solution'] = solution return solution[0] - def _checkin_impl(self, context, request, sel): - if 'activity' not in context['type']: - self._cache_impl(context, sel) - return + def _checkin_impl(self, context, request, cache_call): + if 'clone' in context['layer']: + cache_call = self._cache.checkout + impls = self._volume['implementation'] - to_install = [] - for sel in request.session['solution']: - if 'install' in sel: - enforce(self.inline(), http.ServiceUnavailable, - 'Installation is not available in offline') - to_install.extend(sel.pop('install')) - if to_install: - packagekit.install(to_install) + if 'activity' in context['type']: + to_install = [] + for sel in request.session['solution']: + if 'install' in sel: + enforce(self.inline(), http.ServiceUnavailable, + 'Installation is not available in offline') + to_install.extend(sel.pop('install')) + if to_install: + packagekit.install(to_install) + + def cache_impl(sel): + guid = sel['guid'] + data = sel['data'] + data_path = sel['path'] = impls.path(guid, 'data') + size = data.get('unpack_size') or data['blob_size'] + + blob = None + if 'blob' in data: + blob = data.pop('blob') + + if impls.exists(guid): + return cache_call(guid, size) + + if blob is None: + blob = self._call(method='GET', + path=['implementation', guid, 'data']) + try: + if not exists(dirname(data_path)): + os.makedirs(dirname(data_path)) + if 'activity' in context['type']: + self._cache.ensure(size, data['blob_size']) + with toolkit.TemporaryFile() as tmp_file: + shutil.copyfileobj(blob, tmp_file) + tmp_file.seek(0) + with Bundle(tmp_file, 'application/zip') as bundle: + bundle.extractall(data_path, prefix=bundle.rootdir) + for exec_dir in ('bin', 'activity'): + bin_path = join(data_path, exec_dir) + if not exists(bin_path): + continue + for filename in os.listdir(bin_path): + os.chmod(join(bin_path, filename), 0755) + else: + self._cache.ensure(size) + with file(data_path, 'wb') as f: + shutil.copyfileobj(blob, f) + impl = sel.copy() + impl['layer'] = [] + impl['ctime'] = impl['mtime'] = int(time.time()) + impl['author'] = {} + impl['notes'] = '' + impl['tags'] = [] + impls.create(impl) + return cache_call(guid, size) + except Exception: + shutil.rmtree(data_path, ignore_errors=True) + raise + result = [] for sel in request.session['solution']: if 'path' not in sel and sel['stability'] != 'packaged': - self._cache_impl(context, sel) - - self._cache_solution(context.guid, + result.append(cache_impl(sel)) + self._cache_solution_set(context.guid, request.session['stability'], request.session['solution']) + return result def _cache_solution_path(self, guid): return client.path('solutions', guid[:2], guid) @@ -248,70 +303,17 @@ class Routes(object): stale = (self._node_mtime > os.stat(path).st_mtime) if not stale: stale = (packagekit.mtime() > os.stat(path).st_mtime) + return _CachedSolution(solution), stale - return solution, stale - - def _cache_solution(self, guid, stability, solution): + def _cache_solution_set(self, guid, stability, solution): + if isinstance(solution, _CachedSolution): + return path = self._cache_solution_path(guid) if not exists(dirname(path)): os.makedirs(dirname(path)) with file(path, 'w') as f: json.dump([client.api_url.value, stability, solution], f) - def _cache_impl(self, context, sel): - guid = sel['guid'] - impls = self._volume['implementation'] - data_path = sel['path'] = impls.path(guid, 'data') - - if impls.exists(guid): - self._cache.checkin(guid) - return - - if 'data' in sel: - data = sel.pop('data') - blob = data.pop('blob') - else: - response = Response() - blob = self._call(method='GET', - path=['implementation', guid, 'data'], response=response) - data = response.meta - for key in ('seqno', 'url'): - if key in data: - del data[key] - - try: - if not exists(dirname(data_path)): - os.makedirs(dirname(data_path)) - if 'activity' in context['type']: - self._cache.ensure(data['unpack_size'], data['blob_size']) - with toolkit.TemporaryFile() as tmp_file: - shutil.copyfileobj(blob, tmp_file) - tmp_file.seek(0) - with Bundle(tmp_file, 'application/zip') as bundle: - bundle.extractall(data_path, prefix=bundle.rootdir) - for exec_dir in ('bin', 'activity'): - bin_path = join(data_path, exec_dir) - if not exists(bin_path): - continue - for filename in os.listdir(bin_path): - os.chmod(join(bin_path, filename), 0755) - else: - self._cache.ensure(data['blob_size']) - with file(data_path, 'wb') as f: - shutil.copyfileobj(blob, f) - impl = sel.copy() - impl['data'] = data - impl['layer'] = [] - impl['ctime'] = impl['mtime'] = int(time.time()) - impl['author'] = {} - impl['notes'] = '' - impl['tags'] = [] - impls.create(impl) - self._cache.checkin(guid) - except Exception: - shutil.rmtree(data_path, ignore_errors=True) - raise - def _get_clone(self, request, response): for context in self._checkin_context(request): if 'clone' not in context['layer']: @@ -352,7 +354,8 @@ def _exec(context, request, sel): if not exists(path): os.makedirs(path) - args = sel['command'] + [ + cmd = sel['data']['spec']['*-*']['commands']['activity']['exec'] + args = cmd.split() + [ '-b', request.guid, '-a', request.session['activity_id'], ] @@ -404,3 +407,7 @@ def _exec(context, request, sel): logging.exception('Failed to execute %r args=%r', sel, args) finally: os._exit(1) + + +class _CachedSolution(list): + pass |