Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/client/implementations.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/client/implementations.py')
-rw-r--r--sugar_network/client/implementations.py221
1 files changed, 114 insertions, 107 deletions
diff --git a/sugar_network/client/implementations.py b/sugar_network/client/implementations.py
index 5ab2e5b..841fb5f 100644
--- a/sugar_network/client/implementations.py
+++ b/sugar_network/client/implementations.py
@@ -62,7 +62,7 @@ class Routes(object):
@route('GET', ['context', None], cmd='launch', arguments={'args': list},
mime_type='text/event-stream')
- def launch(self, request, no_spawn):
+ def launch(self, request):
activity_id = request.get('activity_id')
if 'object_id' in request and not activity_id:
activity_id = journal.get(request['object_id'], 'activity_id')
@@ -73,22 +73,28 @@ class Routes(object):
for context in self._checkin_context(request):
yield {'event': 'launch', 'activity_id': activity_id}, request
- impl = self._solve_impl(context, request)
- if 'activity' not in context['type']:
- app = request.get('context') or \
- _mimetype_context(impl['mime_type'])
- enforce(app, 'Cannot find proper application')
- self._checkin_impl(context, request, impl)
- request = Request(path=['context', app],
- object_id=impl['path'], session=request.session)
- for context in self._checkin_context(request):
- impl = self._solve_impl(context, request)
- self._checkin_impl(context, request, impl)
-
- child = _exec(context, request, impl)
- yield {'event': 'exec', 'activity_id': activity_id}
-
- status = child.wait()
+ acquired = []
+ try:
+ impl = self._solve_impl(context, request)
+ if 'activity' not in context['type']:
+ app = request.get('context') or \
+ _mimetype_context(impl['data']['mime_type'])
+ enforce(app, 'Cannot find proper application')
+ acquired += self._checkin_impl(
+ context, request, self._cache.acquire)
+ request = Request(path=['context', app],
+ object_id=impl['path'], session=request.session)
+ for context in self._checkin_context(request):
+ impl = self._solve_impl(context, request)
+ acquired += self._checkin_impl(
+ context, request, self._cache.acquire)
+
+ child = _exec(context, request, impl)
+ yield {'event': 'exec', 'activity_id': activity_id}
+ status = child.wait()
+ finally:
+ self._cache.release(*acquired)
+
_logger.debug('Exit %s[%s]: %r', context.guid, child.pid, status)
enforce(status == 0, 'Process exited with %r status', status)
yield {'event': 'exit', 'activity_id': activity_id}
@@ -102,14 +108,16 @@ class Routes(object):
cloned_path = context.path('.clone')
if request.content:
impl = self._solve_impl(context, request)
- self._checkin_impl(context, request, impl)
+ self._checkin_impl(context, request, self._cache.checkout)
impl_path = relpath(dirname(impl['path']), context.path())
os.symlink(impl_path, cloned_path)
- self._cache.checkout(impl['guid'])
yield {'event': 'ready'}
else:
cloned_impl = basename(os.readlink(cloned_path))
- self._cache.checkin(cloned_impl)
+ meta = self._volume['implementation'].get(
+ cloned_impl).meta('data')
+ size = meta.get('unpack_size') or meta['blob_size']
+ self._cache.checkin(cloned_impl, size)
os.unlink(cloned_path)
@route('GET', ['context', None], cmd='clone',
@@ -140,11 +148,10 @@ class Routes(object):
raise http.ServiceUnavailable, error, sys.exc_info()[2]
def _checkin_context(self, request, layer=None):
+ contexts = self._volume['context']
guid = request.guid
- if layer and not request.content and \
- not self._volume['context'].exists(guid):
+ if layer and not request.content and not contexts.exists(guid):
return
- contexts = self._volume['context']
if not contexts.exists(guid):
context = self._call(method='GET', path=['context', guid])
@@ -184,47 +191,95 @@ class Routes(object):
_logger.debug('Solving %r stability=%r', request.guid, stability)
- if 'activity' not in context['type']:
- _logger.debug('Cloniing %r', request.guid)
- response = Response()
- blob = self._call(method='GET', path=['context', request.guid],
- cmd='clone', stability=stability, response=response)
- response.meta['data']['blob'] = blob
- return response.meta
-
solution, stale = self._cache_solution_get(request.guid, stability)
if stale is False:
_logger.debug('Reuse cached %r solution', request.guid)
elif solution is not None and not self.inline():
- _logger.debug('Reuse stale %r solution in offline', request.guid)
- else:
- _logger.debug('Solve %r', request.guid)
+ _logger.debug('Reuse stale %r in offline', request.guid)
+ elif 'activity' in context['type']:
from sugar_network.client import solver
solution = self._map_exceptions(solver.solve,
self.fallback, request.guid, stability)
+ else:
+ response = Response()
+ blob = self._call(method='GET', path=['context', request.guid],
+ cmd='clone', stability=stability, response=response)
+ response.meta['data']['blob'] = blob
+ solution = [response.meta]
+
request.session['solution'] = solution
return solution[0]
- def _checkin_impl(self, context, request, sel):
- if 'activity' not in context['type']:
- self._cache_impl(context, sel)
- return
+ def _checkin_impl(self, context, request, cache_call):
+ if 'clone' in context['layer']:
+ cache_call = self._cache.checkout
+ impls = self._volume['implementation']
- to_install = []
- for sel in request.session['solution']:
- if 'install' in sel:
- enforce(self.inline(), http.ServiceUnavailable,
- 'Installation is not available in offline')
- to_install.extend(sel.pop('install'))
- if to_install:
- packagekit.install(to_install)
+ if 'activity' in context['type']:
+ to_install = []
+ for sel in request.session['solution']:
+ if 'install' in sel:
+ enforce(self.inline(), http.ServiceUnavailable,
+ 'Installation is not available in offline')
+ to_install.extend(sel.pop('install'))
+ if to_install:
+ packagekit.install(to_install)
+
+ def cache_impl(sel):
+ guid = sel['guid']
+ data = sel['data']
+ data_path = sel['path'] = impls.path(guid, 'data')
+ size = data.get('unpack_size') or data['blob_size']
+
+ blob = None
+ if 'blob' in data:
+ blob = data.pop('blob')
+
+ if impls.exists(guid):
+ return cache_call(guid, size)
+
+ if blob is None:
+ blob = self._call(method='GET',
+ path=['implementation', guid, 'data'])
+ try:
+ if not exists(dirname(data_path)):
+ os.makedirs(dirname(data_path))
+ if 'activity' in context['type']:
+ self._cache.ensure(size, data['blob_size'])
+ with toolkit.TemporaryFile() as tmp_file:
+ shutil.copyfileobj(blob, tmp_file)
+ tmp_file.seek(0)
+ with Bundle(tmp_file, 'application/zip') as bundle:
+ bundle.extractall(data_path, prefix=bundle.rootdir)
+ for exec_dir in ('bin', 'activity'):
+ bin_path = join(data_path, exec_dir)
+ if not exists(bin_path):
+ continue
+ for filename in os.listdir(bin_path):
+ os.chmod(join(bin_path, filename), 0755)
+ else:
+ self._cache.ensure(size)
+ with file(data_path, 'wb') as f:
+ shutil.copyfileobj(blob, f)
+ impl = sel.copy()
+ impl['layer'] = []
+ impl['ctime'] = impl['mtime'] = int(time.time())
+ impl['author'] = {}
+ impl['notes'] = ''
+ impl['tags'] = []
+ impls.create(impl)
+ return cache_call(guid, size)
+ except Exception:
+ shutil.rmtree(data_path, ignore_errors=True)
+ raise
+ result = []
for sel in request.session['solution']:
if 'path' not in sel and sel['stability'] != 'packaged':
- self._cache_impl(context, sel)
-
- self._cache_solution(context.guid,
+ result.append(cache_impl(sel))
+ self._cache_solution_set(context.guid,
request.session['stability'], request.session['solution'])
+ return result
def _cache_solution_path(self, guid):
return client.path('solutions', guid[:2], guid)
@@ -248,70 +303,17 @@ class Routes(object):
stale = (self._node_mtime > os.stat(path).st_mtime)
if not stale:
stale = (packagekit.mtime() > os.stat(path).st_mtime)
+ return _CachedSolution(solution), stale
- return solution, stale
-
- def _cache_solution(self, guid, stability, solution):
+ def _cache_solution_set(self, guid, stability, solution):
+ if isinstance(solution, _CachedSolution):
+ return
path = self._cache_solution_path(guid)
if not exists(dirname(path)):
os.makedirs(dirname(path))
with file(path, 'w') as f:
json.dump([client.api_url.value, stability, solution], f)
- def _cache_impl(self, context, sel):
- guid = sel['guid']
- impls = self._volume['implementation']
- data_path = sel['path'] = impls.path(guid, 'data')
-
- if impls.exists(guid):
- self._cache.checkin(guid)
- return
-
- if 'data' in sel:
- data = sel.pop('data')
- blob = data.pop('blob')
- else:
- response = Response()
- blob = self._call(method='GET',
- path=['implementation', guid, 'data'], response=response)
- data = response.meta
- for key in ('seqno', 'url'):
- if key in data:
- del data[key]
-
- try:
- if not exists(dirname(data_path)):
- os.makedirs(dirname(data_path))
- if 'activity' in context['type']:
- self._cache.ensure(data['unpack_size'], data['blob_size'])
- with toolkit.TemporaryFile() as tmp_file:
- shutil.copyfileobj(blob, tmp_file)
- tmp_file.seek(0)
- with Bundle(tmp_file, 'application/zip') as bundle:
- bundle.extractall(data_path, prefix=bundle.rootdir)
- for exec_dir in ('bin', 'activity'):
- bin_path = join(data_path, exec_dir)
- if not exists(bin_path):
- continue
- for filename in os.listdir(bin_path):
- os.chmod(join(bin_path, filename), 0755)
- else:
- self._cache.ensure(data['blob_size'])
- with file(data_path, 'wb') as f:
- shutil.copyfileobj(blob, f)
- impl = sel.copy()
- impl['data'] = data
- impl['layer'] = []
- impl['ctime'] = impl['mtime'] = int(time.time())
- impl['author'] = {}
- impl['notes'] = ''
- impl['tags'] = []
- impls.create(impl)
- self._cache.checkin(guid)
- except Exception:
- shutil.rmtree(data_path, ignore_errors=True)
- raise
-
def _get_clone(self, request, response):
for context in self._checkin_context(request):
if 'clone' not in context['layer']:
@@ -352,7 +354,8 @@ def _exec(context, request, sel):
if not exists(path):
os.makedirs(path)
- args = sel['command'] + [
+ cmd = sel['data']['spec']['*-*']['commands']['activity']['exec']
+ args = cmd.split() + [
'-b', request.guid,
'-a', request.session['activity_id'],
]
@@ -404,3 +407,7 @@ def _exec(context, request, sel):
logging.exception('Failed to execute %r args=%r', sel, args)
finally:
os._exit(1)
+
+
+class _CachedSolution(list):
+ pass