From dbf9be353818e17a76df5da8e5ffe3f617d29b67 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Sun, 01 Sep 2013 08:07:14 +0000 Subject: Process event streams on low level in router; make launch and clone API routes event streamable --- diff --git a/sugar-network b/sugar-network index 642af7f..cc56837 100755 --- a/sugar-network +++ b/sugar-network @@ -66,12 +66,11 @@ _LIST_RE = re.compile(r'\s*[;,:]+\s*') class ClientRouter(Router, ClientRoutes): def __init__(self): - home = db.Volume(client.path('db'), RESOURCES) + home = db.Volume(client.path('db'), RESOURCES, lazy_open=True) Router.__init__(self, self) ClientRoutes.__init__(self, home, client.api_url.value if not offline.value else None, no_subscription=True) - if not offline.value: for __ in self.subscribe(event='inline', state='online'): break @@ -261,6 +260,30 @@ class Application(application.Application): toolkit.ensure_key(client.key_path()) cp = ClientRouter() result = cp.call(request, response) + + if result is None: + pass + elif response.content_type == 'application/json': + self._dump(result) + elif isinstance(result, types.GeneratorType): + for chunk in result: + self._dump(chunk) + elif hasattr(result, 'read'): + if response.content_type == 'text/event-stream': + while True: + chunk = toolkit.readline(result) + if not chunk: + break + if chunk.startswith('data: '): + self._dump(loads(chunk[6:])) + else: + while True: + chunk = result.read(BUFFER_SIZE) + if not chunk: + break + sys.stdout.write(chunk) + else: + sys.stdout.write(result) finally: if server is not None: server.close() @@ -269,29 +292,6 @@ class Application(application.Application): if pid_path: os.unlink(pid_path) - if result is None: - return - - if response.content_type == 'application/json': - self._dump(result) - elif response.content_type == 'text/event-stream': - while True: - chunk = toolkit.readline(result) - if not chunk: - break - sys.stdout.write(chunk) - elif hasattr(result, 'read'): - while True: - chunk = result.read(BUFFER_SIZE) - if not chunk: - break - sys.stdout.write(chunk) - elif isinstance(result, types.GeneratorType): - for chunk in result: - sys.stdout.write(chunk) - else: - sys.stdout.write(result) - def _parse_path(self, request): if self.args and self.args[0].startswith('/'): request.path = self.args.pop(0).strip('/').split('/') diff --git a/sugar-network-client b/sugar-network-client index a528f27..2da4030 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -104,7 +104,7 @@ class Application(application.Daemon): volume = db.Volume(client.path('db'), RESOURCES, lazy_open=True) routes = CachedClientRoutes(volume, client.api_url.value if not client.server_mode.value else None) - router = Router(routes) + router = Router(routes, allow_spawn=True) logging.info('Listening for IPC requests on %s port', client.ipc_port.value) diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py index 09eb40a..3043d25 100644 --- a/sugar_network/client/cache.py +++ b/sugar_network/client/cache.py @@ -63,15 +63,14 @@ class Cache(object): if to_free <= 0: break - def checkin(self, guid, meta=None): + def checkin(self, guid): self._ensure_open() if guid in self._pool: self._pool.__getitem__(guid) return _logger.debug('Checkin %r', guid) impls = self._volume['implementation'] - if meta is None: - meta = impls.get(guid).meta('data') + meta = impls.get(guid).meta('data') size = meta.get('unpack_size') or meta['blob_size'] mtime = os.stat(impls.path(guid)).st_mtime self._pool[guid] = (size, mtime) diff --git a/sugar_network/client/implementations.py b/sugar_network/client/implementations.py index 34a9145..84c5128 100644 --- a/sugar_network/client/implementations.py +++ b/sugar_network/client/implementations.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -# pylint: disable-msg=E1101,W0611 +# pylint: disable=E1101 import os import re @@ -30,9 +30,9 @@ from os.path import join, exists, basename, dirname, relpath from sugar_network import client, toolkit from sugar_network.client.cache import Cache from sugar_network.client import journal, packagekit -from sugar_network.toolkit.router import Request, Response, route, postroute +from sugar_network.toolkit.router import Request, Response, route from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit import http, coroutine, exception, enforce +from sugar_network.toolkit import http, coroutine, enforce _MIMETYPE_DEFAULTS_KEY = '/desktop/sugar/journal/defaults' @@ -53,33 +53,53 @@ class Routes(object): def invalidate_solutions(self, mtime): self._node_mtime = mtime - @route('GET', ['context', None], cmd='launch', arguments={'args': list}) + @route('GET', ['context', None], cmd='launch', arguments={'args': list}, + mime_type='text/event-stream') def launch(self, request, no_spawn): + activity_id = request.get('activity_id') + if 'object_id' in request and not activity_id: + activity_id = journal.get(request['object_id'], 'activity_id') + if not activity_id: + activity_id = _activity_id_new() + request.session['activity_id'] = activity_id + for context in self._checkin_context(request): - impl = self._checkin_impl(context, request) - if 'activity' in context['type']: - self._exec(request, context, impl) - else: + yield {'event': 'launch', 'activity_id': activity_id}, request + + impl = self._solve_impl(context, request) + if 'activity' not in context['type']: app = request.get('context') or \ _mimetype_context(impl['mime_type']) enforce(app, 'Cannot find proper application') - doc = self._volume['implementation'].path(impl['guid'], 'data') - app_request = Request(path=['context', app], object_id=doc) - for app_context in self._checkin_context(app_request): - app_impl = self._checkin_impl(app_context, app_request) - self._exec(app_request, app_context, app_impl) - - @route('PUT', ['context', None], cmd='clone', arguments={'requires': list}) + self._checkin_impl(context, request, impl) + request = Request(path=['context', app], + object_id=impl['path'], session=request.session) + for context in self._checkin_context(request): + impl = self._solve_impl(context, request) + self._checkin_impl(context, request, impl) + + child = _exec(context, request, impl) + yield {'event': 'exec', 'activity_id': activity_id} + + status = child.wait() + _logger.debug('Exit %s[%s]: %r', context.guid, child.pid, status) + enforce(status == 0, 'Process exited with %r status', status) + yield {'event': 'exit', 'activity_id': activity_id} + + @route('PUT', ['context', None], cmd='clone', arguments={'requires': list}, + mime_type='text/event-stream') def clone(self, request): enforce(not request.content or self.inline(), http.ServiceUnavailable, 'Not available in offline') for context in self._checkin_context(request, 'clone'): cloned_path = context.path('.clone') if request.content: - impl = self._checkin_impl(context, request) + impl = self._solve_impl(context, request) + self._checkin_impl(context, request, impl) impl_path = relpath(dirname(impl['path']), context.path()) os.symlink(impl_path, cloned_path) self._cache.checkout(impl['guid']) + yield {'event': 'ready'} else: cloned_impl = basename(os.readlink(cloned_path)) self._cache.checkin(cloned_impl) @@ -145,20 +165,19 @@ class Routes(object): }) _logger.debug('Checked %r in: %r', guid, layer_value) - def _checkin_impl(self, context, request, clone=None): + def _solve_impl(self, context, request): stability = request.get('stability') or \ client.stability(request.guid) + _logger.debug('Solving %r stability=%r', request.guid, stability) + if 'activity' not in context['type']: _logger.debug('Cloniing %r', request.guid) response = Response() blob = self._call(method='GET', path=['context', request.guid], cmd='clone', stability=stability, response=response) - impl = response.meta - self._cache_impl(context, impl, blob, impl.pop('data')) - return impl - - _logger.debug('Making %r', request.guid) + response.meta['data']['blob'] = blob + return response.meta solution, stale = self._cache_solution_get(request.guid, stability) if stale is False: @@ -171,9 +190,16 @@ class Routes(object): solution = self._map_exceptions(solver.solve, self.fallback, request.guid, stability) request.session['solution'] = solution + request.session['stability'] = stability + return solution[0] + + def _checkin_impl(self, context, request, sel): + if 'activity' not in context['type']: + self._cache_impl(context, sel) + return to_install = [] - for sel in solution: + for sel in request.session['solution']: if 'install' in sel: enforce(self.inline(), http.ServiceUnavailable, 'Installation is not available in offline') @@ -181,95 +207,12 @@ class Routes(object): if to_install: packagekit.install(to_install) - for sel in solution: + for sel in request.session['solution']: if 'path' not in sel and sel['stability'] != 'packaged': self._cache_impl(context, sel) - self._cache_solution(request.guid, stability, solution) - return solution[0] - - def _exec(self, request, context, sel): - # pylint: disable-msg=W0212 - datadir = client.profile_path('data', context.guid) - logdir = client.profile_path('logs') - - args = sel['command'] + (request.get('args') or []) - object_id = request.get('object_id') - if object_id: - if 'activity_id' not in request: - activity_id = journal.get(object_id, 'activity_id') - if activity_id: - request['activity_id'] = activity_id - args.extend(['-o', object_id]) - activity_id = request.get('activity_id') - if not activity_id: - activity_id = request['activity_id'] = _activity_id_new() - uri = request.get('uri') - if uri: - args.extend(['-u', uri]) - args.extend([ - '-b', request.guid, - '-a', activity_id, - ]) - - for path in [ - join(datadir, 'instance'), - join(datadir, 'data'), - join(datadir, 'tmp'), - logdir, - ]: - if not exists(path): - os.makedirs(path) - - event = {'event': 'exec', - 'cmd': 'launch', - 'guid': request.guid, - 'args': args, - 'log_path': - toolkit.unique_filename(logdir, context.guid + '.log'), - } - event.update(request) - event.update(request.session) - self.broadcast(event) - - child = coroutine.fork() - if child is not None: - _logger.debug('Exec %s[%s]: %r', request.guid, child.pid, args) - child.watch(self.__sigchld_cb, child.pid, event) - return - - try: - with file('/dev/null', 'r') as f: - os.dup2(f.fileno(), 0) - with file(event['log_path'], 'a+') as f: - os.dup2(f.fileno(), 1) - os.dup2(f.fileno(), 2) - toolkit.init_logging() - - impl_path = sel['path'] - os.chdir(impl_path) - - environ = os.environ - environ['PATH'] = ':'.join([ - join(impl_path, 'activity'), - join(impl_path, 'bin'), - environ['PATH'], - ]) - environ['PYTHONPATH'] = impl_path + ':' + \ - environ.get('PYTHONPATH', '') - environ['SUGAR_BUNDLE_PATH'] = impl_path - environ['SUGAR_BUNDLE_ID'] = context.guid - environ['SUGAR_BUNDLE_NAME'] = \ - toolkit.gettext(context['title']).encode('utf8') - environ['SUGAR_BUNDLE_VERSION'] = sel['version'] - environ['SUGAR_ACTIVITY_ROOT'] = datadir - environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale') - - os.execvpe(args[0], args, environ) - except BaseException: - logging.exception('Failed to execute %r args=%r', sel, args) - finally: - os._exit(1) + self._cache_solution(context.guid, + request.session['stability'], request.session['solution']) def _cache_solution_path(self, guid): return client.path('cache', 'solutions', guid[:2], guid) @@ -303,16 +246,19 @@ class Routes(object): with file(path, 'w') as f: json.dump([client.api_url.value, stability, solution], f) - def _cache_impl(self, context, sel, blob=None, data=None): + def _cache_impl(self, context, sel): guid = sel['guid'] impls = self._volume['implementation'] data_path = sel['path'] = impls.path(guid, 'data') if impls.exists(guid): - self._cache.checkin(guid, data) + self._cache.checkin(guid) return - if blob is None: + if 'data' in sel: + data = sel.pop('data') + blob = data.pop('blob') + else: response = Response() blob = self._call(method='GET', path=['implementation', guid, 'data'], response=response) @@ -360,15 +306,6 @@ class Routes(object): 'guid', 'context', 'license', 'version', 'stability', 'data']) return impl.meta('data') - def __sigchld_cb(self, returncode, pid, event): - _logger.debug('Exit %s[%s]: %r', event['guid'], pid, returncode) - if returncode: - event['event'] = 'failure' - event['error'] = 'Process exited with %r status' % returncode - else: - event['event'] = 'exit' - self.broadcast(event) - def _activity_id_new(): data = '%s%s%s' % ( @@ -383,3 +320,75 @@ def _mimetype_context(mime_type): mime_type = _MIMETYPE_INVALID_CHARS.sub('_', mime_type) key = '/'.join([_MIMETYPE_DEFAULTS_KEY, mime_type]) return gconf.client_get_default().get_string(key) + + +def _exec(context, request, sel): + # pylint: disable-msg=W0212 + datadir = client.profile_path('data', context.guid) + logdir = client.profile_path('logs') + + for path in [ + join(datadir, 'instance'), + join(datadir, 'data'), + join(datadir, 'tmp'), + logdir, + ]: + if not exists(path): + os.makedirs(path) + + args = sel['command'] + [ + '-b', request.guid, + '-a', request.session['activity_id'], + ] + if 'object_id' in request: + args.extend(['-o', request['object_id']]) + if 'uri' in request: + args.extend(['-u', request['uri']]) + if 'args' in request: + args.extend(request['args']) + request.session['args'] = args + + log_path = toolkit.unique_filename(logdir, context.guid + '.log') + request.session['logs'] = [ + join(logdir, 'shell.log'), + join(logdir, 'sugar-network-client.log'), + log_path, + ] + + child = coroutine.fork() + if child is not None: + _logger.debug('Exec %s[%s]: %r', request.guid, child.pid, args) + return child + + try: + with file('/dev/null', 'r') as f: + os.dup2(f.fileno(), 0) + with file(log_path, 'a+') as f: + os.dup2(f.fileno(), 1) + os.dup2(f.fileno(), 2) + toolkit.init_logging() + + impl_path = sel['path'] + os.chdir(impl_path) + + environ = os.environ + environ['PATH'] = ':'.join([ + join(impl_path, 'activity'), + join(impl_path, 'bin'), + environ['PATH'], + ]) + environ['PYTHONPATH'] = impl_path + ':' + \ + environ.get('PYTHONPATH', '') + environ['SUGAR_BUNDLE_PATH'] = impl_path + environ['SUGAR_BUNDLE_ID'] = context.guid + environ['SUGAR_BUNDLE_NAME'] = \ + toolkit.gettext(context['title']).encode('utf8') + environ['SUGAR_BUNDLE_VERSION'] = sel['version'] + environ['SUGAR_ACTIVITY_ROOT'] = datadir + environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale') + + os.execvpe(args[0], args, environ) + except BaseException: + logging.exception('Failed to execute %r args=%r', sel, args) + finally: + os._exit(1) diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index dfbda6f..55fd92f 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -13,8 +13,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -# pylint: disable=W0611 - import os import logging import httplib @@ -25,7 +23,7 @@ from sugar_network.client import journal, implementations from sugar_network.node.slave import SlaveRoutes from sugar_network.toolkit import netlink, mountpoints from sugar_network.toolkit.router import ACL, Request, Response, Router -from sugar_network.toolkit.router import route, fallbackroute, postroute +from sugar_network.toolkit.router import route, fallbackroute from sugar_network.toolkit import zeroconf, coroutine, http, enforce @@ -74,23 +72,6 @@ class ClientRoutes(model.FrontRoutes, implementations.Routes, journal.Routes): self._got_offline() self._local.volume.close() - @postroute - def postroute(self, request, response, result, error): - if error is None or isinstance(error, http.StatusPass): - return - event = {'event': 'failure', - 'exception': type(error).__name__, - 'error': str(error), - 'method': request.method, - 'cmd': request.cmd, - 'resource': request.resource, - 'guid': request.guid, - 'prop': request.prop, - } - event.update(request) - event.update(request.session) - self.broadcast(event) - @fallbackroute('GET', ['hub']) def hub(self, request, response): """Serve Hub via HTTP instead of file:// for IPC users. diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index 5bb82a1..d31bc5f 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import json import logging import mimetypes from os.path import join, split @@ -146,7 +145,7 @@ class FrontRoutes(object): # a subscription and do not stuck in waiting for the first event, # it should pass `ping` argument to return fake event to unblock # `GET /?cmd=subscribe` call. - yield 'data: %s\n\n' % json.dumps({'event': 'pong'}) + yield {'event': 'pong'} try: while True: @@ -158,7 +157,7 @@ class FrontRoutes(object): elif event.get(key) != value: break else: - yield 'data: %s\n\n' % json.dumps(event) + yield event finally: _logger.debug('Stop pulling events to %s user', peer) diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index ada63da..d57d3ce 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -306,6 +306,8 @@ class Connection(object): def _decode_reply(self, reply): if reply.headers.get('Content-Type') == 'application/json': return json.loads(reply.content) + elif reply.headers.get('Content-Type') == 'text/event-stream': + return _pull_events(reply.raw) else: return reply.content @@ -340,14 +342,7 @@ class _Subscription(object): toolkit.exception('Failed to read from %r subscription, ' 'will resubscribe', self._client.api_url) self._content = None - - if line.startswith('data: '): - try: - return json.loads(line.split(' ', 1)[1]) - except Exception: - toolkit.exception( - 'Failed to parse %r event from %r subscription', - line, self._client.api_url) + return _parse_event(line) def _handshake(self, **params): if self._content is not None: @@ -360,6 +355,24 @@ class _Subscription(object): return self._content +def _pull_events(stream): + while True: + line = toolkit.readline(stream) + if not line: + break + event = _parse_event(line) + if event is not None: + yield event + + +def _parse_event(line): + if line and line.startswith('data: '): + try: + return json.loads(line[6:]) + except Exception: + _logger.exception('Failed to parse %r event', line) + + def _sign(key_path, data): import hashlib from M2Crypto import DSA diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 189556f..b7ba542 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -101,13 +101,13 @@ class Request(dict): subcall = lambda *args: enforce(False) def __init__(self, environ=None, method=None, path=None, cmd=None, - content=None, content_type=None, **kwargs): + content=None, content_type=None, session=None, **kwargs): dict.__init__(self) self.path = [] self.cmd = None self.environ = {} - self.session = {} + self.session = session or {} self._content = _NOT_SET self._dirty_query = False @@ -378,12 +378,13 @@ class Blob(dict): class Router(object): - def __init__(self, routes_model): + def __init__(self, routes_model, allow_spawn=False): + self._routes_model = routes_model + self._allow_spawn = allow_spawn self._valid_origins = set() self._invalid_origins = set() self._host = None self._routes = _Routes() - self._routes_model = routes_model self._preroutes = set() self._postroutes = set() @@ -534,8 +535,12 @@ class Router(object): start_response(response.status, response.items()) if result_streamed: - for i in result: - yield i + if response.content_type == 'text/event-stream': + for event in _event_stream(request, result): + yield 'data: %s\n\n' % json.dumps(event) + else: + for i in result: + yield i elif result is not None: yield result @@ -571,6 +576,11 @@ class Router(object): exception = None try: result = route_.callback(**kwargs) + if route_.mime_type == 'text/event-stream' and \ + self._allow_spawn and 'spawn' in request: + _logger.debug('Spawn event stream for %r', request) + coroutine.spawn(self._event_stream, request, result) + result = None except Exception, exception: raise else: @@ -618,6 +628,20 @@ class Router(object): raise http.NotFound('Path not found') return route_ + def _event_stream(self, request, stream): + commons = {'method': request.method} + if request.cmd: + commons['cmd'] = request.cmd + if request.resource: + commons['resource'] = request.resource + if request.guid: + commons['guid'] = request.guid + if request.prop: + commons['prop'] = request.prop + for event in _event_stream(request, stream): + event.update(commons) + self._routes_model.broadcast(event) + def _assert_origin(self, environ): origin = environ['HTTP_ORIGIN'] if origin in self._valid_origins: @@ -691,6 +715,25 @@ def _stream_reader(stream): stream.close() +def _event_stream(request, stream): + try: + for event in stream: + if type(event) is tuple: + for i in event[1:]: + event[0].update(i) + event = event[0] + yield event + except Exception, error: + _logger.exception('Event stream %r failed', request) + event = {'event': 'failure', + 'exception': type(error).__name__, + 'error': str(error), + } + event.update(request.session) + yield event + _logger.debug('Event stream %r exited', request) + + def _typecast(cast, value): if cast is list or cast is tuple: if isinstance(value, basestring): diff --git a/tests/units/client/cache.py b/tests/units/client/cache.py index be22a22..d8c2dde 100755 --- a/tests/units/client/cache.py +++ b/tests/units/client/cache.py @@ -226,16 +226,16 @@ class CacheTest(tests.Test): 'stability = stable', ]])), cmd='release', initial=True) - conn.get(['context', 'context1'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'context1'], cmd='launch')][-1]['event']) self.assertEqual([impl1], [i for i in self.client_routes._cache]) assert local_volume['implementation'].exists(impl1) - conn.get(['context', 'context2'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'context2'], cmd='launch')][-1]['event']) self.assertEqual([impl2, impl1], [i for i in self.client_routes._cache]) assert local_volume['implementation'].exists(impl1) assert local_volume['implementation'].exists(impl2) - conn.get(['context', 'context3'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'context3'], cmd='launch')][-1]['event']) self.assertEqual([impl3, impl2, impl1], [i for i in self.client_routes._cache]) assert local_volume['implementation'].exists(impl1) assert local_volume['implementation'].exists(impl2) diff --git a/tests/units/client/implementations.py b/tests/units/client/implementations.py index af12d01..3ed4922 100755 --- a/tests/units/client/implementations.py +++ b/tests/units/client/implementations.py @@ -86,7 +86,7 @@ class Implementations(tests.Test): self.override(packagekit, 'resolve', resolve) self.override(packagekit, 'install', install) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) with file('resolve') as f: deps = [pickle.load(f), @@ -157,7 +157,7 @@ class Implementations(tests.Test): }]] cached_path = 'cache/solutions/bu/bundle_id' - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.assertEqual(solution, json.load(file(cached_path))) os.utime(cached_path, (0, 0)) @@ -189,35 +189,35 @@ class Implementations(tests.Test): self.touch([cached_path, solution]) cached_mtime = int(os.stat(cached_path).st_mtime) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) client.api_url.value = 'fake' - self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch') + self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.assertEqual(solution, file(cached_path).read()) client.api_url.value = 'http://127.0.0.1:8888' - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) - self.client_routes._node_mtime = cached_mtime + 1 - self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch') + self.client_routes._node_mtime = cached_mtime + 2 + self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.assertEqual(solution, file(cached_path).read()) self.client_routes._node_mtime = cached_mtime - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) - self.override(packagekit, 'mtime', lambda: cached_mtime + 1) - self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch') + self.override(packagekit, 'mtime', lambda: cached_mtime + 2) + self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.assertEqual(solution, file(cached_path).read()) self.override(packagekit, 'mtime', lambda: cached_mtime) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.touch(('config', [ '[stabilities]', 'bundle_id = buggy', ])) Option.load(['config']) - self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch') + self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.assertEqual(solution, file(cached_path).read()) self.touch(('config', [ @@ -225,7 +225,7 @@ class Implementations(tests.Test): 'bundle_id = stable', ])) Option.load(['config']) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) def test_DeliberateReuseCachedSolutionInOffline(self): self.start_online_client() @@ -251,11 +251,11 @@ class Implementations(tests.Test): self.touch(['cache/solutions/bu/bundle_id', solution]) client.api_url.value = 'fake' - self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch') + self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.node.stop() coroutine.sleep(.1) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) def test_StabilityPreferences(self): self.start_online_client() @@ -293,7 +293,7 @@ class Implementations(tests.Test): ]])), cmd='release') cached_path = 'cache/solutions/bu/bundle_id' - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.assertEqual('1', json.load(file(cached_path))[2][0]['version']) self.touch(('config', [ @@ -301,7 +301,7 @@ class Implementations(tests.Test): 'bundle_id = testing', ])) Option.load(['config']) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.assertEqual('2', json.load(file(cached_path))[2][0]['version']) self.touch(('config', [ @@ -309,7 +309,7 @@ class Implementations(tests.Test): 'bundle_id = testing buggy', ])) Option.load(['config']) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.assertEqual('3', json.load(file(cached_path))[2][0]['version']) self.touch(('config', [ @@ -317,7 +317,7 @@ class Implementations(tests.Test): 'default = testing', ])) Option.load(['config']) - conn.get(['context', 'bundle_id'], cmd='launch') + self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.assertEqual('2', json.load(file(cached_path))[2][0]['version']) def test_LaunchContext(self): @@ -336,7 +336,7 @@ class Implementations(tests.Test): ]], ['TestActivity/bin/activity', [ '#!/bin/sh', - 'cat $2', + 'cat $6', ]], )), cmd='release', initial=True) @@ -358,11 +358,10 @@ class Implementations(tests.Test): 'blob': StringIO('content'), }}) - conn.get(['context', 'document'], cmd='launch', context='bundle_id') + self.assertEqual('exit', [i for i in conn.get(['context', 'document'], cmd='launch', context='bundle_id')][-1]['event']) coroutine.sleep(.1) self.assertEqual('content', file('.sugar/default/logs/bundle_id.log').read()) - if __name__ == '__main__': tests.main() diff --git a/tests/units/client/offline_routes.py b/tests/units/client/offline_routes.py index 2a8692b..d3a11d8 100755 --- a/tests/units/client/offline_routes.py +++ b/tests/units/client/offline_routes.py @@ -297,22 +297,13 @@ class OfflineRoutes(tests.Test): self.node.stop() coroutine.sleep(.1) - def read_events(): - for event in ipc.subscribe(event='!commit'): - events.append(event) - events = [] - coroutine.spawn(read_events) - coroutine.dispatch() - - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) - log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'foo': 'bar', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'foo': 'bar', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'exit', 'activity_id': 'activity_id'}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( [client.api_url.value, ['stable'], solution], @@ -321,26 +312,10 @@ class OfflineRoutes(tests.Test): def test_ServiceUnavailableWhileSolving(self): ipc = self.start_offline_client() - def read_events(): - for event in ipc.subscribe(event='!commit'): - events.append(event) - events = [] - coroutine.spawn(read_events) - coroutine.dispatch() - - self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', 'foo'], cmd='launch') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'GET', - 'guid': 'foo', - 'cmd': 'launch', - 'resource': 'context', - 'prop': None, - 'exception': 'ServiceUnavailable', - 'error': "Resource 'foo' does not exist in 'context'", - }, - events[-1]) + self.assertEqual([ + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': "Resource 'foo' does not exist in 'context'"}, + ], + [i for i in ipc.get(['context', 'foo'], cmd='launch')]) context = ipc.post(['context'], { 'type': 'activity', @@ -348,22 +323,14 @@ class OfflineRoutes(tests.Test): 'summary': 'summary', 'description': 'description', }) - self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'GET', - 'guid': context, - 'cmd': 'launch', - 'resource': 'context', - 'prop': None, - 'exception': 'ServiceUnavailable', - 'error': """\ + self.assertEqual([ + {'event': 'launch', 'activity_id': 'activity_id'}, + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': """\ Can't find all required implementations: - %s -> (problem) - No known implementations at all""" % context, - }, - events[-1]) + No known implementations at all""" % context}, + ], + [i for i in ipc.get(['context', context], cmd='launch')]) impl = ipc.post(['implementation'], { 'context': context, @@ -379,25 +346,15 @@ Can't find all required implementations: }, }, }}) - - self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'GET', - 'guid': context, - 'cmd': 'launch', - 'resource': 'context', - 'prop': None, - 'exception': 'ServiceUnavailable', - 'error': """\ + self.assertEqual([ + {'event': 'launch', 'activity_id': 'activity_id'}, + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': """\ Can't find all required implementations: - %s -> 1 (%s) - dep -> (problem) - No known implementations at all""" % (context, impl), - }, - events[-1]) - + No known implementations at all""" % (context, impl)}, + ], + [i for i in ipc.get(['context', context], cmd='launch')]) assert not exists('cache/solutions/%s/%s' % (context[:2], context)) def test_ServiceUnavailableWhileInstalling(self): @@ -441,42 +398,29 @@ Can't find all required implementations: return dict([(i, {'name': i, 'pk_id': i, 'version': '0', 'arch': '*', 'installed': False}) for i in names]) self.override(packagekit, 'resolve', resolve) - def read_events(): - for event in ipc.subscribe(event='!commit'): - events.append(event) - events = [] - coroutine.spawn(read_events) - coroutine.dispatch() - - self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'GET', - 'guid': context, - 'cmd': 'launch', - 'resource': 'context', - 'prop': None, - 'exception': 'ServiceUnavailable', - 'error': 'Installation is not available in offline', - 'solution': [ - { 'guid': impl, - 'context': context, - 'license': ['GPLv3+'], - 'stability': 'stable', - 'version': '1', - 'command': ['true'], - }, - { 'guid': 'dep', - 'context': 'dep', - 'install': [{'arch': '*', 'installed': False, 'name': 'dep.bin', 'pk_id': 'dep.bin', 'version': '0'}], - 'license': None, - 'stability': 'packaged', - 'version': '0', - }, - ], - }, - events[-1]) + self.assertEqual([ + {'event': 'launch', 'activity_id': 'activity_id'}, + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': 'Installation is not available in offline', + 'stability': ['stable'], + 'solution': [ + { 'guid': impl, + 'context': context, + 'license': ['GPLv3+'], + 'stability': 'stable', + 'version': '1', + 'command': ['true'], + }, + { 'guid': 'dep', + 'context': 'dep', + 'install': [{'arch': '*', 'installed': False, 'name': 'dep.bin', 'pk_id': 'dep.bin', 'version': '0'}], + 'license': None, + 'stability': 'packaged', + 'version': '0', + }, + ], + }, + ], + [i for i in ipc.get(['context', context], cmd='launch')]) def test_NoAuthors(self): ipc = self.start_offline_client() diff --git a/tests/units/client/online_routes.py b/tests/units/client/online_routes.py index 79b01cb..f4e052e 100755 --- a/tests/units/client/online_routes.py +++ b/tests/units/client/online_routes.py @@ -4,6 +4,7 @@ import os import json import time +import copy import shutil import zipfile from cStringIO import StringIO @@ -364,13 +365,11 @@ class OnlineRoutes(tests.Test): def test_clone_Fails(self): self.start_online_client([User, Context, Implementation]) conn = IPCConnection() - events = [] - def read_events(): - for event in conn.subscribe(event='!commit'): - events.append(event) - coroutine.spawn(read_events) - coroutine.dispatch() + self.assertEqual([ + {'event': 'failure', 'exception': 'NotFound', 'error': "Resource 'foo' does not exist in 'context'"}, + ], + [i for i in conn.put(['context', 'foo'], True, cmd='clone')]) context = conn.post(['context'], { 'type': 'activity', @@ -379,22 +378,14 @@ class OnlineRoutes(tests.Test): 'description': 'description', }) - self.assertRaises(http.NotFound, conn.put, ['context', context], True, cmd='clone') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'PUT', - 'cmd': 'clone', - 'resource': 'context', - 'guid': context, - 'prop': None, - 'exception': 'NotFound', - 'error': """\ + self.assertEqual([ + {'event': 'failure', 'exception': 'NotFound', 'error': """\ Can't find all required implementations: - %s -> (problem) - No known implementations at all""" % context, - }, - events[-1]) + No known implementations at all""" % context}, + ], + [i for i in conn.put(['context', context], True, cmd='clone')]) + assert not exists('cache/solutions/%s/%s' % (context[:2], context)) impl = conn.post(['implementation'], { @@ -417,29 +408,22 @@ Can't find all required implementations: }, }}) - self.assertRaises(http.NotFound, conn.put, ['context', context], True, cmd='clone') - coroutine.dispatch() - self.assertEqual({ - 'event': 'failure', - 'method': 'PUT', - 'cmd': 'clone', - 'resource': 'context', - 'guid': context, - 'prop': None, - 'exception': 'NotFound', - 'error': 'BLOB does not exist', - 'solution': [{ - 'command': ['echo'], - 'context': context, - 'guid': impl, - 'license': ['GPLv3+'], - 'extract': 'topdir', - 'stability': 'stable', - 'version': '1', - 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl), - }], - }, - events[-1]) + self.assertEqual([ + {'event': 'failure', 'exception': 'NotFound', 'error': 'BLOB does not exist', + 'stability': ['stable'], + 'solution': [{ + 'command': ['echo'], + 'context': context, + 'guid': impl, + 'license': ['GPLv3+'], + 'extract': 'topdir', + 'stability': 'stable', + 'version': '1', + 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl), + }], + }, + ], + [i for i in conn.put(['context', context], True, cmd='clone')]) assert not exists('cache/solutions/%s/%s' % (context[:2], context)) def test_clone_Content(self): @@ -470,8 +454,10 @@ Can't find all required implementations: self.node_volume['implementation'].update(impl, {'data': {'blob': StringIO(blob), 'foo': 'bar'}}) clone_path = 'client/context/%s/%s/.clone' % (context[:2], context) - ipc.put(['context', context], True, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + {'event': 'ready'}, + ], + [i for i in ipc.put(['context', context], True, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -520,8 +506,9 @@ Can't find all required implementations: assert exists(clone_path + '/data.blob') assert not exists('cache/solutions/%s/%s' % (context[:2], context)) - ipc.put(['context', context], False, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + ], + [i for i in ipc.put(['context', context], False, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -554,8 +541,10 @@ Can't find all required implementations: assert not lexists(clone_path) assert not exists('cache/solutions/%s/%s' % (context[:2], context)) - ipc.put(['context', context], True, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + {'event': 'ready'}, + ], + [i for i in ipc.put(['context', context], True, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -601,11 +590,14 @@ Can't find all required implementations: 'stability': 'stable', 'version': '1', 'command': ['true'], - 'path': blob_path, }] + downloaded_solution = copy.deepcopy(solution) + downloaded_solution[0]['path'] = blob_path - ipc.put(['context', 'bundle_id'], True, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + {'event': 'ready'}, + ], + [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -657,11 +649,12 @@ Can't find all required implementations: self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read()) assert exists(clone_path + '/data.blob/activity/activity.info') self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api_url.value, ['stable'], downloaded_solution], json.load(file('cache/solutions/bu/bundle_id'))) - ipc.put(['context', 'bundle_id'], False, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + ], + [i for i in ipc.put(['context', 'bundle_id'], False, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -700,11 +693,13 @@ Can't find all required implementations: self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read()) assert not exists(clone_path) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api_url.value, ['stable'], downloaded_solution], json.load(file('cache/solutions/bu/bundle_id'))) - ipc.put(['context', 'bundle_id'], True, cmd='clone') - coroutine.dispatch() + self.assertEqual([ + {'event': 'ready'}, + ], + [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')]) self.assertEqual({ 'event': 'update', @@ -717,7 +712,7 @@ Can't find all required implementations: sorted(ipc.get(['context'], reply='layer')['result'])) assert exists(clone_path + '/data.blob/activity/activity.info') self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api_url.value, ['stable'], downloaded_solution], json.load(file('cache/solutions/bu/bundle_id'))) def test_clone_ActivityWithStabilityPreferences(self): @@ -749,7 +744,10 @@ Can't find all required implementations: blob2 = self.zips(['TestActivity/activity/activity.info', activity_info2]) impl2 = ipc.upload(['implementation'], StringIO(blob2), cmd='release', initial=True) - ipc.put(['context', 'bundle_id'], True, cmd='clone') + self.assertEqual( + 'ready', + [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event']) + coroutine.dispatch() self.assertEqual({'layer': ['clone']}, ipc.get(['context', 'bundle_id'], reply='layer')) self.assertEqual([impl1], [i.guid for i in local['implementation'].find()[0]]) @@ -761,8 +759,13 @@ Can't find all required implementations: ])) Option.load(['config']) - ipc.put(['context', 'bundle_id'], False, cmd='clone') - ipc.put(['context', 'bundle_id'], True, cmd='clone') + self.assertEqual( + [], + [i for i in ipc.put(['context', 'bundle_id'], False, cmd='clone')]) + self.assertEqual( + 'ready', + [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event']) + coroutine.dispatch() self.assertEqual({'layer': ['clone']}, ipc.get(['context', 'bundle_id'], reply='layer')) self.assertEqual([impl1, impl2], [i.guid for i in local['implementation'].find()[0]]) @@ -803,8 +806,9 @@ Can't find all required implementations: }, ipc.head(['context', 'bundle_id'], cmd='clone')) - ipc.put(['context', 'bundle_id'], True, cmd='clone') - coroutine.dispatch() + self.assertEqual( + 'ready', + [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event']) blob_path = tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl) self.assertEqual({ @@ -842,16 +846,6 @@ Can't find all required implementations: impl = ipc.upload(['implementation'], StringIO(blob), cmd='release', initial=True) coroutine.sleep(.1) - def read_events(): - for event in ipc.subscribe(event='!commit'): - events.append(event) - events = [] - coroutine.spawn(read_events) - coroutine.dispatch() - - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) - solution = [{ 'guid': impl, 'context': 'bundle_id', @@ -860,17 +854,19 @@ Can't find all required implementations: 'stability': 'stable', 'version': '1', 'command': ['true'], - 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl), }] + downloaded_solution = copy.deepcopy(solution) + downloaded_solution[0]['path'] = tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl) log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'exit', 'activity_id': 'activity_id'}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api_url.value, ['stable'], downloaded_solution], json.load(file('cache/solutions/bu/bundle_id'))) blob = self.zips(['TestActivity/activity/activity.info', [ @@ -886,10 +882,6 @@ Can't find all required implementations: coroutine.sleep(.1) shutil.rmtree('cache/solutions') - del events[:] - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) - solution = [{ 'guid': impl, 'context': 'bundle_id', @@ -902,10 +894,11 @@ Can't find all required implementations: }] log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_1.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'exit', 'activity_id': 'activity_id'}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( [client.api_url.value, ['stable'], solution], @@ -914,41 +907,56 @@ Can't find all required implementations: self.node.stop() coroutine.sleep(.1) - del events[:] - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) - log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_2.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'exit', 'activity_id': 'activity_id'}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( [client.api_url.value, ['stable'], solution], json.load(file('cache/solutions/bu/bundle_id'))) shutil.rmtree('cache/solutions') - del events[:] - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) - log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_3.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'exit', 'activity_id': 'activity_id'}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( [client.api_url.value, ['stable'], solution], json.load(file('cache/solutions/bu/bundle_id'))) - def test_launch_ActivityFailed(self): + def test_launch_Fails(self): local = self.start_online_client([User, Context, Implementation]) ipc = IPCConnection() + self.assertEqual([ + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'NotFound', 'error': "Resource 'foo' does not exist in 'context'"}, + ], + [i for i in ipc.get(['context', 'foo'], cmd='launch')]) + + ipc.post(['context'], { + 'guid': 'bundle_id', + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + self.assertEqual([ + {'event': 'launch', 'activity_id': 'activity_id', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'NotFound', 'error': """\ +Can't find all required implementations: +- bundle_id -> (problem) + No known implementations at all"""}, + ], + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) + activity_info = '\n'.join([ '[Activity]', 'name = TestActivity', @@ -960,17 +968,6 @@ Can't find all required implementations: ]) blob = self.zips(['TestActivity/activity/activity.info', activity_info]) impl = ipc.upload(['implementation'], StringIO(blob), cmd='release', initial=True) - coroutine.sleep(.1) - - def read_events(): - for event in ipc.subscribe(event='!commit'): - events.append(event) - events = [] - coroutine.spawn(read_events) - coroutine.dispatch() - - ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar') - coroutine.sleep(.1) solution = [{ 'guid': impl, @@ -982,12 +979,20 @@ Can't find all required implementations: 'command': ['false'], 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl), }] - log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log' self.assertEqual([ - {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, - {'event': 'failure', 'error': 'Process exited with 1 status', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution}, + {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'}, + {'event': 'exec', 'activity_id': 'activity_id'}, + {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'RuntimeError', 'error': 'Process exited with 1 status', + 'stability': ['stable'], + 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'], + 'solution': solution, + 'logs': [ + tests.tmpdir + '/.sugar/default/logs/shell.log', + tests.tmpdir + '/.sugar/default/logs/sugar-network-client.log', + tests.tmpdir + '/.sugar/default/logs/bundle_id.log', + ]}, ], - events) + [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['implementation'].exists(impl) self.assertEqual( [client.api_url.value, ['stable'], solution], diff --git a/tests/units/model/routes.py b/tests/units/model/routes.py index 04491ee..9cad606 100755 --- a/tests/units/model/routes.py +++ b/tests/units/model/routes.py @@ -54,11 +54,6 @@ class RoutesTest(tests.Test): def read_events(): for event in routes.subscribe(event='!commit'): - if not event.strip(): - continue - assert event.startswith('data: ') - assert event.endswith('\n\n') - event = json.loads(event[6:]) events.append(event) job = coroutine.spawn(read_events) @@ -84,8 +79,7 @@ class RoutesTest(tests.Test): routes = model.FrontRoutes() for event in routes.subscribe(ping=True): break - self.assertEqual('data: {"event": "pong"}\n\n', event) - + self.assertEqual({'event': 'pong'}, event) if __name__ == '__main__': diff --git a/tests/units/node/node.py b/tests/units/node/node.py index e163912..3506495 100755 --- a/tests/units/node/node.py +++ b/tests/units/node/node.py @@ -159,7 +159,7 @@ class NodeTest(tests.Test): def subscribe(): for event in cp.subscribe(): - events.append(json.loads(event[6:])) + events.append(event) events = [] coroutine.spawn(subscribe) coroutine.dispatch() @@ -184,7 +184,7 @@ class NodeTest(tests.Test): def subscribe(): for event in cp.subscribe(): - events.append(json.loads(event[6:])) + events.append(event) events = [] coroutine.spawn(subscribe) coroutine.dispatch() diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py index b970a90..ad90cf5 100755 --- a/tests/units/toolkit/router.py +++ b/tests/units/toolkit/router.py @@ -10,7 +10,7 @@ from __init__ import tests, src_root from sugar_network import db from sugar_network.toolkit.router import Blob, Router, Request, _parse_accept_language, route, fallbackroute, preroute, postroute, _filename -from sugar_network.toolkit import default_lang, http +from sugar_network.toolkit import default_lang, http, coroutine class RouterTest(tests.Test): @@ -1266,6 +1266,94 @@ class RouterTest(tests.Test): ], response) + def test_EventStream(self): + + class Routes(object): + + @route('GET', mime_type='text/event-stream') + def get(self): + yield None + yield 0 + yield -1 + yield '2' + yield {'3': 4} + + reply = Router(Routes())({ + 'PATH_INFO': '/', + 'REQUEST_METHOD': 'GET', + }, + lambda status, headers: None) + + self.assertEqual([ + 'data: null\n\n', + 'data: 0\n\n', + 'data: -1\n\n', + 'data: "2"\n\n', + 'data: {"3": 4}\n\n', + ], + [i for i in reply]) + + def test_SpawnEventStream(self): + events = [] + + class Routes(object): + + @route('GET', [None, None, None], cmd='cmd', mime_type='text/event-stream') + def ok(self): + yield {} + yield {'foo': 'bar'} + + def broadcast(self, event): + events.append(event.copy()) + + reply = Router(Routes(), allow_spawn=True)({ + 'PATH_INFO': '/resource/guid/prop', + 'REQUEST_METHOD': 'GET', + 'QUERY_STRING': 'cmd=cmd&spawn&arg', + }, + lambda status, headers: None) + self.assertEqual([], [i for i in reply]) + + coroutine.sleep(.1) + self.assertEqual([ + {'method': 'GET', 'resource': 'resource', 'guid': 'guid', 'prop': 'prop', 'cmd': 'cmd'}, + {'method': 'GET', 'resource': 'resource', 'guid': 'guid', 'prop': 'prop', 'cmd': 'cmd', 'foo': 'bar'}, + ], + events) + del events[:] + + def test_SpawnEventStreamFailure(self): + events = [] + + class Routes(object): + + @route('GET', mime_type='text/event-stream') + def error(self, request): + request.session['bar'] = 'foo' + yield {} + yield {'foo': 'bar'}, {'add': 'on'} + raise RuntimeError('error') + + def broadcast(self, event): + events.append(event.copy()) + + reply = Router(Routes(), allow_spawn=True)({ + 'PATH_INFO': '/', + 'REQUEST_METHOD': 'GET', + 'QUERY_STRING': 'spawn', + }, + lambda status, headers: None) + self.assertEqual([], [i for i in reply]) + + coroutine.sleep(.1) + self.assertEqual([ + {'method': 'GET'}, + {'method': 'GET', 'foo': 'bar', 'add': 'on'}, + {'method': 'GET', 'bar': 'foo', 'event': 'failure', 'exception': 'RuntimeError', 'error': 'error'}, + ], + events) + del events[:] + if __name__ == '__main__': tests.main() -- cgit v0.9.1