Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-09-01 08:07:14 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-09-01 08:52:28 (GMT)
commitdbf9be353818e17a76df5da8e5ffe3f617d29b67 (patch)
tree3647f66e1f558b8aeb6156eb76d4372ce21c88ef
parentb4e415ab9f70373ce1d5572467d58f95b5a9c755 (diff)
Process event streams on low level in router; make launch and clone API routes event streamable
-rwxr-xr-xsugar-network50
-rwxr-xr-xsugar-network-client2
-rw-r--r--sugar_network/client/cache.py5
-rw-r--r--sugar_network/client/implementations.py251
-rw-r--r--sugar_network/client/routes.py21
-rw-r--r--sugar_network/model/routes.py5
-rw-r--r--sugar_network/toolkit/http.py29
-rw-r--r--sugar_network/toolkit/router.py55
-rwxr-xr-xtests/units/client/cache.py6
-rwxr-xr-xtests/units/client/implementations.py43
-rwxr-xr-xtests/units/client/offline_routes.py142
-rwxr-xr-xtests/units/client/online_routes.py237
-rwxr-xr-xtests/units/model/routes.py8
-rwxr-xr-xtests/units/node/node.py4
-rwxr-xr-xtests/units/toolkit/router.py90
15 files changed, 511 insertions, 437 deletions
diff --git a/sugar-network b/sugar-network
index 642af7f..cc56837 100755
--- a/sugar-network
+++ b/sugar-network
@@ -66,12 +66,11 @@ _LIST_RE = re.compile(r'\s*[;,:]+\s*')
class ClientRouter(Router, ClientRoutes):
def __init__(self):
- home = db.Volume(client.path('db'), RESOURCES)
+ home = db.Volume(client.path('db'), RESOURCES, lazy_open=True)
Router.__init__(self, self)
ClientRoutes.__init__(self, home,
client.api_url.value if not offline.value else None,
no_subscription=True)
-
if not offline.value:
for __ in self.subscribe(event='inline', state='online'):
break
@@ -261,6 +260,30 @@ class Application(application.Application):
toolkit.ensure_key(client.key_path())
cp = ClientRouter()
result = cp.call(request, response)
+
+ if result is None:
+ pass
+ elif response.content_type == 'application/json':
+ self._dump(result)
+ elif isinstance(result, types.GeneratorType):
+ for chunk in result:
+ self._dump(chunk)
+ elif hasattr(result, 'read'):
+ if response.content_type == 'text/event-stream':
+ while True:
+ chunk = toolkit.readline(result)
+ if not chunk:
+ break
+ if chunk.startswith('data: '):
+ self._dump(loads(chunk[6:]))
+ else:
+ while True:
+ chunk = result.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ sys.stdout.write(chunk)
+ else:
+ sys.stdout.write(result)
finally:
if server is not None:
server.close()
@@ -269,29 +292,6 @@ class Application(application.Application):
if pid_path:
os.unlink(pid_path)
- if result is None:
- return
-
- if response.content_type == 'application/json':
- self._dump(result)
- elif response.content_type == 'text/event-stream':
- while True:
- chunk = toolkit.readline(result)
- if not chunk:
- break
- sys.stdout.write(chunk)
- elif hasattr(result, 'read'):
- while True:
- chunk = result.read(BUFFER_SIZE)
- if not chunk:
- break
- sys.stdout.write(chunk)
- elif isinstance(result, types.GeneratorType):
- for chunk in result:
- sys.stdout.write(chunk)
- else:
- sys.stdout.write(result)
-
def _parse_path(self, request):
if self.args and self.args[0].startswith('/'):
request.path = self.args.pop(0).strip('/').split('/')
diff --git a/sugar-network-client b/sugar-network-client
index a528f27..2da4030 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -104,7 +104,7 @@ class Application(application.Daemon):
volume = db.Volume(client.path('db'), RESOURCES, lazy_open=True)
routes = CachedClientRoutes(volume,
client.api_url.value if not client.server_mode.value else None)
- router = Router(routes)
+ router = Router(routes, allow_spawn=True)
logging.info('Listening for IPC requests on %s port',
client.ipc_port.value)
diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py
index 09eb40a..3043d25 100644
--- a/sugar_network/client/cache.py
+++ b/sugar_network/client/cache.py
@@ -63,15 +63,14 @@ class Cache(object):
if to_free <= 0:
break
- def checkin(self, guid, meta=None):
+ def checkin(self, guid):
self._ensure_open()
if guid in self._pool:
self._pool.__getitem__(guid)
return
_logger.debug('Checkin %r', guid)
impls = self._volume['implementation']
- if meta is None:
- meta = impls.get(guid).meta('data')
+ meta = impls.get(guid).meta('data')
size = meta.get('unpack_size') or meta['blob_size']
mtime = os.stat(impls.path(guid)).st_mtime
self._pool[guid] = (size, mtime)
diff --git a/sugar_network/client/implementations.py b/sugar_network/client/implementations.py
index 34a9145..84c5128 100644
--- a/sugar_network/client/implementations.py
+++ b/sugar_network/client/implementations.py
@@ -13,7 +13,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-# pylint: disable-msg=E1101,W0611
+# pylint: disable=E1101
import os
import re
@@ -30,9 +30,9 @@ from os.path import join, exists, basename, dirname, relpath
from sugar_network import client, toolkit
from sugar_network.client.cache import Cache
from sugar_network.client import journal, packagekit
-from sugar_network.toolkit.router import Request, Response, route, postroute
+from sugar_network.toolkit.router import Request, Response, route
from sugar_network.toolkit.bundle import Bundle
-from sugar_network.toolkit import http, coroutine, exception, enforce
+from sugar_network.toolkit import http, coroutine, enforce
_MIMETYPE_DEFAULTS_KEY = '/desktop/sugar/journal/defaults'
@@ -53,33 +53,53 @@ class Routes(object):
def invalidate_solutions(self, mtime):
self._node_mtime = mtime
- @route('GET', ['context', None], cmd='launch', arguments={'args': list})
+ @route('GET', ['context', None], cmd='launch', arguments={'args': list},
+ mime_type='text/event-stream')
def launch(self, request, no_spawn):
+ activity_id = request.get('activity_id')
+ if 'object_id' in request and not activity_id:
+ activity_id = journal.get(request['object_id'], 'activity_id')
+ if not activity_id:
+ activity_id = _activity_id_new()
+ request.session['activity_id'] = activity_id
+
for context in self._checkin_context(request):
- impl = self._checkin_impl(context, request)
- if 'activity' in context['type']:
- self._exec(request, context, impl)
- else:
+ yield {'event': 'launch', 'activity_id': activity_id}, request
+
+ impl = self._solve_impl(context, request)
+ if 'activity' not in context['type']:
app = request.get('context') or \
_mimetype_context(impl['mime_type'])
enforce(app, 'Cannot find proper application')
- doc = self._volume['implementation'].path(impl['guid'], 'data')
- app_request = Request(path=['context', app], object_id=doc)
- for app_context in self._checkin_context(app_request):
- app_impl = self._checkin_impl(app_context, app_request)
- self._exec(app_request, app_context, app_impl)
-
- @route('PUT', ['context', None], cmd='clone', arguments={'requires': list})
+ self._checkin_impl(context, request, impl)
+ request = Request(path=['context', app],
+ object_id=impl['path'], session=request.session)
+ for context in self._checkin_context(request):
+ impl = self._solve_impl(context, request)
+ self._checkin_impl(context, request, impl)
+
+ child = _exec(context, request, impl)
+ yield {'event': 'exec', 'activity_id': activity_id}
+
+ status = child.wait()
+ _logger.debug('Exit %s[%s]: %r', context.guid, child.pid, status)
+ enforce(status == 0, 'Process exited with %r status', status)
+ yield {'event': 'exit', 'activity_id': activity_id}
+
+ @route('PUT', ['context', None], cmd='clone', arguments={'requires': list},
+ mime_type='text/event-stream')
def clone(self, request):
enforce(not request.content or self.inline(), http.ServiceUnavailable,
'Not available in offline')
for context in self._checkin_context(request, 'clone'):
cloned_path = context.path('.clone')
if request.content:
- impl = self._checkin_impl(context, request)
+ impl = self._solve_impl(context, request)
+ self._checkin_impl(context, request, impl)
impl_path = relpath(dirname(impl['path']), context.path())
os.symlink(impl_path, cloned_path)
self._cache.checkout(impl['guid'])
+ yield {'event': 'ready'}
else:
cloned_impl = basename(os.readlink(cloned_path))
self._cache.checkin(cloned_impl)
@@ -145,20 +165,19 @@ class Routes(object):
})
_logger.debug('Checked %r in: %r', guid, layer_value)
- def _checkin_impl(self, context, request, clone=None):
+ def _solve_impl(self, context, request):
stability = request.get('stability') or \
client.stability(request.guid)
+ _logger.debug('Solving %r stability=%r', request.guid, stability)
+
if 'activity' not in context['type']:
_logger.debug('Cloniing %r', request.guid)
response = Response()
blob = self._call(method='GET', path=['context', request.guid],
cmd='clone', stability=stability, response=response)
- impl = response.meta
- self._cache_impl(context, impl, blob, impl.pop('data'))
- return impl
-
- _logger.debug('Making %r', request.guid)
+ response.meta['data']['blob'] = blob
+ return response.meta
solution, stale = self._cache_solution_get(request.guid, stability)
if stale is False:
@@ -171,9 +190,16 @@ class Routes(object):
solution = self._map_exceptions(solver.solve,
self.fallback, request.guid, stability)
request.session['solution'] = solution
+ request.session['stability'] = stability
+ return solution[0]
+
+ def _checkin_impl(self, context, request, sel):
+ if 'activity' not in context['type']:
+ self._cache_impl(context, sel)
+ return
to_install = []
- for sel in solution:
+ for sel in request.session['solution']:
if 'install' in sel:
enforce(self.inline(), http.ServiceUnavailable,
'Installation is not available in offline')
@@ -181,95 +207,12 @@ class Routes(object):
if to_install:
packagekit.install(to_install)
- for sel in solution:
+ for sel in request.session['solution']:
if 'path' not in sel and sel['stability'] != 'packaged':
self._cache_impl(context, sel)
- self._cache_solution(request.guid, stability, solution)
- return solution[0]
-
- def _exec(self, request, context, sel):
- # pylint: disable-msg=W0212
- datadir = client.profile_path('data', context.guid)
- logdir = client.profile_path('logs')
-
- args = sel['command'] + (request.get('args') or [])
- object_id = request.get('object_id')
- if object_id:
- if 'activity_id' not in request:
- activity_id = journal.get(object_id, 'activity_id')
- if activity_id:
- request['activity_id'] = activity_id
- args.extend(['-o', object_id])
- activity_id = request.get('activity_id')
- if not activity_id:
- activity_id = request['activity_id'] = _activity_id_new()
- uri = request.get('uri')
- if uri:
- args.extend(['-u', uri])
- args.extend([
- '-b', request.guid,
- '-a', activity_id,
- ])
-
- for path in [
- join(datadir, 'instance'),
- join(datadir, 'data'),
- join(datadir, 'tmp'),
- logdir,
- ]:
- if not exists(path):
- os.makedirs(path)
-
- event = {'event': 'exec',
- 'cmd': 'launch',
- 'guid': request.guid,
- 'args': args,
- 'log_path':
- toolkit.unique_filename(logdir, context.guid + '.log'),
- }
- event.update(request)
- event.update(request.session)
- self.broadcast(event)
-
- child = coroutine.fork()
- if child is not None:
- _logger.debug('Exec %s[%s]: %r', request.guid, child.pid, args)
- child.watch(self.__sigchld_cb, child.pid, event)
- return
-
- try:
- with file('/dev/null', 'r') as f:
- os.dup2(f.fileno(), 0)
- with file(event['log_path'], 'a+') as f:
- os.dup2(f.fileno(), 1)
- os.dup2(f.fileno(), 2)
- toolkit.init_logging()
-
- impl_path = sel['path']
- os.chdir(impl_path)
-
- environ = os.environ
- environ['PATH'] = ':'.join([
- join(impl_path, 'activity'),
- join(impl_path, 'bin'),
- environ['PATH'],
- ])
- environ['PYTHONPATH'] = impl_path + ':' + \
- environ.get('PYTHONPATH', '')
- environ['SUGAR_BUNDLE_PATH'] = impl_path
- environ['SUGAR_BUNDLE_ID'] = context.guid
- environ['SUGAR_BUNDLE_NAME'] = \
- toolkit.gettext(context['title']).encode('utf8')
- environ['SUGAR_BUNDLE_VERSION'] = sel['version']
- environ['SUGAR_ACTIVITY_ROOT'] = datadir
- environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale')
-
- os.execvpe(args[0], args, environ)
- except BaseException:
- logging.exception('Failed to execute %r args=%r', sel, args)
- finally:
- os._exit(1)
+ self._cache_solution(context.guid,
+ request.session['stability'], request.session['solution'])
def _cache_solution_path(self, guid):
return client.path('cache', 'solutions', guid[:2], guid)
@@ -303,16 +246,19 @@ class Routes(object):
with file(path, 'w') as f:
json.dump([client.api_url.value, stability, solution], f)
- def _cache_impl(self, context, sel, blob=None, data=None):
+ def _cache_impl(self, context, sel):
guid = sel['guid']
impls = self._volume['implementation']
data_path = sel['path'] = impls.path(guid, 'data')
if impls.exists(guid):
- self._cache.checkin(guid, data)
+ self._cache.checkin(guid)
return
- if blob is None:
+ if 'data' in sel:
+ data = sel.pop('data')
+ blob = data.pop('blob')
+ else:
response = Response()
blob = self._call(method='GET',
path=['implementation', guid, 'data'], response=response)
@@ -360,15 +306,6 @@ class Routes(object):
'guid', 'context', 'license', 'version', 'stability', 'data'])
return impl.meta('data')
- def __sigchld_cb(self, returncode, pid, event):
- _logger.debug('Exit %s[%s]: %r', event['guid'], pid, returncode)
- if returncode:
- event['event'] = 'failure'
- event['error'] = 'Process exited with %r status' % returncode
- else:
- event['event'] = 'exit'
- self.broadcast(event)
-
def _activity_id_new():
data = '%s%s%s' % (
@@ -383,3 +320,75 @@ def _mimetype_context(mime_type):
mime_type = _MIMETYPE_INVALID_CHARS.sub('_', mime_type)
key = '/'.join([_MIMETYPE_DEFAULTS_KEY, mime_type])
return gconf.client_get_default().get_string(key)
+
+
+def _exec(context, request, sel):
+ # pylint: disable-msg=W0212
+ datadir = client.profile_path('data', context.guid)
+ logdir = client.profile_path('logs')
+
+ for path in [
+ join(datadir, 'instance'),
+ join(datadir, 'data'),
+ join(datadir, 'tmp'),
+ logdir,
+ ]:
+ if not exists(path):
+ os.makedirs(path)
+
+ args = sel['command'] + [
+ '-b', request.guid,
+ '-a', request.session['activity_id'],
+ ]
+ if 'object_id' in request:
+ args.extend(['-o', request['object_id']])
+ if 'uri' in request:
+ args.extend(['-u', request['uri']])
+ if 'args' in request:
+ args.extend(request['args'])
+ request.session['args'] = args
+
+ log_path = toolkit.unique_filename(logdir, context.guid + '.log')
+ request.session['logs'] = [
+ join(logdir, 'shell.log'),
+ join(logdir, 'sugar-network-client.log'),
+ log_path,
+ ]
+
+ child = coroutine.fork()
+ if child is not None:
+ _logger.debug('Exec %s[%s]: %r', request.guid, child.pid, args)
+ return child
+
+ try:
+ with file('/dev/null', 'r') as f:
+ os.dup2(f.fileno(), 0)
+ with file(log_path, 'a+') as f:
+ os.dup2(f.fileno(), 1)
+ os.dup2(f.fileno(), 2)
+ toolkit.init_logging()
+
+ impl_path = sel['path']
+ os.chdir(impl_path)
+
+ environ = os.environ
+ environ['PATH'] = ':'.join([
+ join(impl_path, 'activity'),
+ join(impl_path, 'bin'),
+ environ['PATH'],
+ ])
+ environ['PYTHONPATH'] = impl_path + ':' + \
+ environ.get('PYTHONPATH', '')
+ environ['SUGAR_BUNDLE_PATH'] = impl_path
+ environ['SUGAR_BUNDLE_ID'] = context.guid
+ environ['SUGAR_BUNDLE_NAME'] = \
+ toolkit.gettext(context['title']).encode('utf8')
+ environ['SUGAR_BUNDLE_VERSION'] = sel['version']
+ environ['SUGAR_ACTIVITY_ROOT'] = datadir
+ environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale')
+
+ os.execvpe(args[0], args, environ)
+ except BaseException:
+ logging.exception('Failed to execute %r args=%r', sel, args)
+ finally:
+ os._exit(1)
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py
index dfbda6f..55fd92f 100644
--- a/sugar_network/client/routes.py
+++ b/sugar_network/client/routes.py
@@ -13,8 +13,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-# pylint: disable=W0611
-
import os
import logging
import httplib
@@ -25,7 +23,7 @@ from sugar_network.client import journal, implementations
from sugar_network.node.slave import SlaveRoutes
from sugar_network.toolkit import netlink, mountpoints
from sugar_network.toolkit.router import ACL, Request, Response, Router
-from sugar_network.toolkit.router import route, fallbackroute, postroute
+from sugar_network.toolkit.router import route, fallbackroute
from sugar_network.toolkit import zeroconf, coroutine, http, enforce
@@ -74,23 +72,6 @@ class ClientRoutes(model.FrontRoutes, implementations.Routes, journal.Routes):
self._got_offline()
self._local.volume.close()
- @postroute
- def postroute(self, request, response, result, error):
- if error is None or isinstance(error, http.StatusPass):
- return
- event = {'event': 'failure',
- 'exception': type(error).__name__,
- 'error': str(error),
- 'method': request.method,
- 'cmd': request.cmd,
- 'resource': request.resource,
- 'guid': request.guid,
- 'prop': request.prop,
- }
- event.update(request)
- event.update(request.session)
- self.broadcast(event)
-
@fallbackroute('GET', ['hub'])
def hub(self, request, response):
"""Serve Hub via HTTP instead of file:// for IPC users.
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index 5bb82a1..d31bc5f 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
import logging
import mimetypes
from os.path import join, split
@@ -146,7 +145,7 @@ class FrontRoutes(object):
# a subscription and do not stuck in waiting for the first event,
# it should pass `ping` argument to return fake event to unblock
# `GET /?cmd=subscribe` call.
- yield 'data: %s\n\n' % json.dumps({'event': 'pong'})
+ yield {'event': 'pong'}
try:
while True:
@@ -158,7 +157,7 @@ class FrontRoutes(object):
elif event.get(key) != value:
break
else:
- yield 'data: %s\n\n' % json.dumps(event)
+ yield event
finally:
_logger.debug('Stop pulling events to %s user', peer)
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index ada63da..d57d3ce 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -306,6 +306,8 @@ class Connection(object):
def _decode_reply(self, reply):
if reply.headers.get('Content-Type') == 'application/json':
return json.loads(reply.content)
+ elif reply.headers.get('Content-Type') == 'text/event-stream':
+ return _pull_events(reply.raw)
else:
return reply.content
@@ -340,14 +342,7 @@ class _Subscription(object):
toolkit.exception('Failed to read from %r subscription, '
'will resubscribe', self._client.api_url)
self._content = None
-
- if line.startswith('data: '):
- try:
- return json.loads(line.split(' ', 1)[1])
- except Exception:
- toolkit.exception(
- 'Failed to parse %r event from %r subscription',
- line, self._client.api_url)
+ return _parse_event(line)
def _handshake(self, **params):
if self._content is not None:
@@ -360,6 +355,24 @@ class _Subscription(object):
return self._content
+def _pull_events(stream):
+ while True:
+ line = toolkit.readline(stream)
+ if not line:
+ break
+ event = _parse_event(line)
+ if event is not None:
+ yield event
+
+
+def _parse_event(line):
+ if line and line.startswith('data: '):
+ try:
+ return json.loads(line[6:])
+ except Exception:
+ _logger.exception('Failed to parse %r event', line)
+
+
def _sign(key_path, data):
import hashlib
from M2Crypto import DSA
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 189556f..b7ba542 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -101,13 +101,13 @@ class Request(dict):
subcall = lambda *args: enforce(False)
def __init__(self, environ=None, method=None, path=None, cmd=None,
- content=None, content_type=None, **kwargs):
+ content=None, content_type=None, session=None, **kwargs):
dict.__init__(self)
self.path = []
self.cmd = None
self.environ = {}
- self.session = {}
+ self.session = session or {}
self._content = _NOT_SET
self._dirty_query = False
@@ -378,12 +378,13 @@ class Blob(dict):
class Router(object):
- def __init__(self, routes_model):
+ def __init__(self, routes_model, allow_spawn=False):
+ self._routes_model = routes_model
+ self._allow_spawn = allow_spawn
self._valid_origins = set()
self._invalid_origins = set()
self._host = None
self._routes = _Routes()
- self._routes_model = routes_model
self._preroutes = set()
self._postroutes = set()
@@ -534,8 +535,12 @@ class Router(object):
start_response(response.status, response.items())
if result_streamed:
- for i in result:
- yield i
+ if response.content_type == 'text/event-stream':
+ for event in _event_stream(request, result):
+ yield 'data: %s\n\n' % json.dumps(event)
+ else:
+ for i in result:
+ yield i
elif result is not None:
yield result
@@ -571,6 +576,11 @@ class Router(object):
exception = None
try:
result = route_.callback(**kwargs)
+ if route_.mime_type == 'text/event-stream' and \
+ self._allow_spawn and 'spawn' in request:
+ _logger.debug('Spawn event stream for %r', request)
+ coroutine.spawn(self._event_stream, request, result)
+ result = None
except Exception, exception:
raise
else:
@@ -618,6 +628,20 @@ class Router(object):
raise http.NotFound('Path not found')
return route_
+ def _event_stream(self, request, stream):
+ commons = {'method': request.method}
+ if request.cmd:
+ commons['cmd'] = request.cmd
+ if request.resource:
+ commons['resource'] = request.resource
+ if request.guid:
+ commons['guid'] = request.guid
+ if request.prop:
+ commons['prop'] = request.prop
+ for event in _event_stream(request, stream):
+ event.update(commons)
+ self._routes_model.broadcast(event)
+
def _assert_origin(self, environ):
origin = environ['HTTP_ORIGIN']
if origin in self._valid_origins:
@@ -691,6 +715,25 @@ def _stream_reader(stream):
stream.close()
+def _event_stream(request, stream):
+ try:
+ for event in stream:
+ if type(event) is tuple:
+ for i in event[1:]:
+ event[0].update(i)
+ event = event[0]
+ yield event
+ except Exception, error:
+ _logger.exception('Event stream %r failed', request)
+ event = {'event': 'failure',
+ 'exception': type(error).__name__,
+ 'error': str(error),
+ }
+ event.update(request.session)
+ yield event
+ _logger.debug('Event stream %r exited', request)
+
+
def _typecast(cast, value):
if cast is list or cast is tuple:
if isinstance(value, basestring):
diff --git a/tests/units/client/cache.py b/tests/units/client/cache.py
index be22a22..d8c2dde 100755
--- a/tests/units/client/cache.py
+++ b/tests/units/client/cache.py
@@ -226,16 +226,16 @@ class CacheTest(tests.Test):
'stability = stable',
]])), cmd='release', initial=True)
- conn.get(['context', 'context1'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'context1'], cmd='launch')][-1]['event'])
self.assertEqual([impl1], [i for i in self.client_routes._cache])
assert local_volume['implementation'].exists(impl1)
- conn.get(['context', 'context2'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'context2'], cmd='launch')][-1]['event'])
self.assertEqual([impl2, impl1], [i for i in self.client_routes._cache])
assert local_volume['implementation'].exists(impl1)
assert local_volume['implementation'].exists(impl2)
- conn.get(['context', 'context3'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'context3'], cmd='launch')][-1]['event'])
self.assertEqual([impl3, impl2, impl1], [i for i in self.client_routes._cache])
assert local_volume['implementation'].exists(impl1)
assert local_volume['implementation'].exists(impl2)
diff --git a/tests/units/client/implementations.py b/tests/units/client/implementations.py
index af12d01..3ed4922 100755
--- a/tests/units/client/implementations.py
+++ b/tests/units/client/implementations.py
@@ -86,7 +86,7 @@ class Implementations(tests.Test):
self.override(packagekit, 'resolve', resolve)
self.override(packagekit, 'install', install)
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
with file('resolve') as f:
deps = [pickle.load(f),
@@ -157,7 +157,7 @@ class Implementations(tests.Test):
}]]
cached_path = 'cache/solutions/bu/bundle_id'
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.assertEqual(solution, json.load(file(cached_path)))
os.utime(cached_path, (0, 0))
@@ -189,35 +189,35 @@ class Implementations(tests.Test):
self.touch([cached_path, solution])
cached_mtime = int(os.stat(cached_path).st_mtime)
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
client.api_url.value = 'fake'
- self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.assertEqual(solution, file(cached_path).read())
client.api_url.value = 'http://127.0.0.1:8888'
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
- self.client_routes._node_mtime = cached_mtime + 1
- self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch')
+ self.client_routes._node_mtime = cached_mtime + 2
+ self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.assertEqual(solution, file(cached_path).read())
self.client_routes._node_mtime = cached_mtime
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
- self.override(packagekit, 'mtime', lambda: cached_mtime + 1)
- self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch')
+ self.override(packagekit, 'mtime', lambda: cached_mtime + 2)
+ self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.assertEqual(solution, file(cached_path).read())
self.override(packagekit, 'mtime', lambda: cached_mtime)
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.touch(('config', [
'[stabilities]',
'bundle_id = buggy',
]))
Option.load(['config'])
- self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.assertEqual(solution, file(cached_path).read())
self.touch(('config', [
@@ -225,7 +225,7 @@ class Implementations(tests.Test):
'bundle_id = stable',
]))
Option.load(['config'])
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
def test_DeliberateReuseCachedSolutionInOffline(self):
self.start_online_client()
@@ -251,11 +251,11 @@ class Implementations(tests.Test):
self.touch(['cache/solutions/bu/bundle_id', solution])
client.api_url.value = 'fake'
- self.assertRaises(http.NotFound, conn.get, ['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.node.stop()
coroutine.sleep(.1)
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
def test_StabilityPreferences(self):
self.start_online_client()
@@ -293,7 +293,7 @@ class Implementations(tests.Test):
]])), cmd='release')
cached_path = 'cache/solutions/bu/bundle_id'
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.assertEqual('1', json.load(file(cached_path))[2][0]['version'])
self.touch(('config', [
@@ -301,7 +301,7 @@ class Implementations(tests.Test):
'bundle_id = testing',
]))
Option.load(['config'])
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.assertEqual('2', json.load(file(cached_path))[2][0]['version'])
self.touch(('config', [
@@ -309,7 +309,7 @@ class Implementations(tests.Test):
'bundle_id = testing buggy',
]))
Option.load(['config'])
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.assertEqual('3', json.load(file(cached_path))[2][0]['version'])
self.touch(('config', [
@@ -317,7 +317,7 @@ class Implementations(tests.Test):
'default = testing',
]))
Option.load(['config'])
- conn.get(['context', 'bundle_id'], cmd='launch')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.assertEqual('2', json.load(file(cached_path))[2][0]['version'])
def test_LaunchContext(self):
@@ -336,7 +336,7 @@ class Implementations(tests.Test):
]],
['TestActivity/bin/activity', [
'#!/bin/sh',
- 'cat $2',
+ 'cat $6',
]],
)), cmd='release', initial=True)
@@ -358,11 +358,10 @@ class Implementations(tests.Test):
'blob': StringIO('content'),
}})
- conn.get(['context', 'document'], cmd='launch', context='bundle_id')
+ self.assertEqual('exit', [i for i in conn.get(['context', 'document'], cmd='launch', context='bundle_id')][-1]['event'])
coroutine.sleep(.1)
self.assertEqual('content', file('.sugar/default/logs/bundle_id.log').read())
-
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/client/offline_routes.py b/tests/units/client/offline_routes.py
index 2a8692b..d3a11d8 100755
--- a/tests/units/client/offline_routes.py
+++ b/tests/units/client/offline_routes.py
@@ -297,22 +297,13 @@ class OfflineRoutes(tests.Test):
self.node.stop()
coroutine.sleep(.1)
- def read_events():
- for event in ipc.subscribe(event='!commit'):
- events.append(event)
- events = []
- coroutine.spawn(read_events)
- coroutine.dispatch()
-
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
-
log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'foo': 'bar', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'foo': 'bar', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'exit', 'activity_id': 'activity_id'},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
[client.api_url.value, ['stable'], solution],
@@ -321,26 +312,10 @@ class OfflineRoutes(tests.Test):
def test_ServiceUnavailableWhileSolving(self):
ipc = self.start_offline_client()
- def read_events():
- for event in ipc.subscribe(event='!commit'):
- events.append(event)
- events = []
- coroutine.spawn(read_events)
- coroutine.dispatch()
-
- self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', 'foo'], cmd='launch')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'GET',
- 'guid': 'foo',
- 'cmd': 'launch',
- 'resource': 'context',
- 'prop': None,
- 'exception': 'ServiceUnavailable',
- 'error': "Resource 'foo' does not exist in 'context'",
- },
- events[-1])
+ self.assertEqual([
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': "Resource 'foo' does not exist in 'context'"},
+ ],
+ [i for i in ipc.get(['context', 'foo'], cmd='launch')])
context = ipc.post(['context'], {
'type': 'activity',
@@ -348,22 +323,14 @@ class OfflineRoutes(tests.Test):
'summary': 'summary',
'description': 'description',
})
- self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'GET',
- 'guid': context,
- 'cmd': 'launch',
- 'resource': 'context',
- 'prop': None,
- 'exception': 'ServiceUnavailable',
- 'error': """\
+ self.assertEqual([
+ {'event': 'launch', 'activity_id': 'activity_id'},
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': """\
Can't find all required implementations:
- %s -> (problem)
- No known implementations at all""" % context,
- },
- events[-1])
+ No known implementations at all""" % context},
+ ],
+ [i for i in ipc.get(['context', context], cmd='launch')])
impl = ipc.post(['implementation'], {
'context': context,
@@ -379,25 +346,15 @@ Can't find all required implementations:
},
},
}})
-
- self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'GET',
- 'guid': context,
- 'cmd': 'launch',
- 'resource': 'context',
- 'prop': None,
- 'exception': 'ServiceUnavailable',
- 'error': """\
+ self.assertEqual([
+ {'event': 'launch', 'activity_id': 'activity_id'},
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': """\
Can't find all required implementations:
- %s -> 1 (%s)
- dep -> (problem)
- No known implementations at all""" % (context, impl),
- },
- events[-1])
-
+ No known implementations at all""" % (context, impl)},
+ ],
+ [i for i in ipc.get(['context', context], cmd='launch')])
assert not exists('cache/solutions/%s/%s' % (context[:2], context))
def test_ServiceUnavailableWhileInstalling(self):
@@ -441,42 +398,29 @@ Can't find all required implementations:
return dict([(i, {'name': i, 'pk_id': i, 'version': '0', 'arch': '*', 'installed': False}) for i in names])
self.override(packagekit, 'resolve', resolve)
- def read_events():
- for event in ipc.subscribe(event='!commit'):
- events.append(event)
- events = []
- coroutine.spawn(read_events)
- coroutine.dispatch()
-
- self.assertRaises(http.ServiceUnavailable, ipc.get, ['context', context], cmd='launch')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'GET',
- 'guid': context,
- 'cmd': 'launch',
- 'resource': 'context',
- 'prop': None,
- 'exception': 'ServiceUnavailable',
- 'error': 'Installation is not available in offline',
- 'solution': [
- { 'guid': impl,
- 'context': context,
- 'license': ['GPLv3+'],
- 'stability': 'stable',
- 'version': '1',
- 'command': ['true'],
- },
- { 'guid': 'dep',
- 'context': 'dep',
- 'install': [{'arch': '*', 'installed': False, 'name': 'dep.bin', 'pk_id': 'dep.bin', 'version': '0'}],
- 'license': None,
- 'stability': 'packaged',
- 'version': '0',
- },
- ],
- },
- events[-1])
+ self.assertEqual([
+ {'event': 'launch', 'activity_id': 'activity_id'},
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'ServiceUnavailable', 'error': 'Installation is not available in offline',
+ 'stability': ['stable'],
+ 'solution': [
+ { 'guid': impl,
+ 'context': context,
+ 'license': ['GPLv3+'],
+ 'stability': 'stable',
+ 'version': '1',
+ 'command': ['true'],
+ },
+ { 'guid': 'dep',
+ 'context': 'dep',
+ 'install': [{'arch': '*', 'installed': False, 'name': 'dep.bin', 'pk_id': 'dep.bin', 'version': '0'}],
+ 'license': None,
+ 'stability': 'packaged',
+ 'version': '0',
+ },
+ ],
+ },
+ ],
+ [i for i in ipc.get(['context', context], cmd='launch')])
def test_NoAuthors(self):
ipc = self.start_offline_client()
diff --git a/tests/units/client/online_routes.py b/tests/units/client/online_routes.py
index 79b01cb..f4e052e 100755
--- a/tests/units/client/online_routes.py
+++ b/tests/units/client/online_routes.py
@@ -4,6 +4,7 @@
import os
import json
import time
+import copy
import shutil
import zipfile
from cStringIO import StringIO
@@ -364,13 +365,11 @@ class OnlineRoutes(tests.Test):
def test_clone_Fails(self):
self.start_online_client([User, Context, Implementation])
conn = IPCConnection()
- events = []
- def read_events():
- for event in conn.subscribe(event='!commit'):
- events.append(event)
- coroutine.spawn(read_events)
- coroutine.dispatch()
+ self.assertEqual([
+ {'event': 'failure', 'exception': 'NotFound', 'error': "Resource 'foo' does not exist in 'context'"},
+ ],
+ [i for i in conn.put(['context', 'foo'], True, cmd='clone')])
context = conn.post(['context'], {
'type': 'activity',
@@ -379,22 +378,14 @@ class OnlineRoutes(tests.Test):
'description': 'description',
})
- self.assertRaises(http.NotFound, conn.put, ['context', context], True, cmd='clone')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'PUT',
- 'cmd': 'clone',
- 'resource': 'context',
- 'guid': context,
- 'prop': None,
- 'exception': 'NotFound',
- 'error': """\
+ self.assertEqual([
+ {'event': 'failure', 'exception': 'NotFound', 'error': """\
Can't find all required implementations:
- %s -> (problem)
- No known implementations at all""" % context,
- },
- events[-1])
+ No known implementations at all""" % context},
+ ],
+ [i for i in conn.put(['context', context], True, cmd='clone')])
+
assert not exists('cache/solutions/%s/%s' % (context[:2], context))
impl = conn.post(['implementation'], {
@@ -417,29 +408,22 @@ Can't find all required implementations:
},
}})
- self.assertRaises(http.NotFound, conn.put, ['context', context], True, cmd='clone')
- coroutine.dispatch()
- self.assertEqual({
- 'event': 'failure',
- 'method': 'PUT',
- 'cmd': 'clone',
- 'resource': 'context',
- 'guid': context,
- 'prop': None,
- 'exception': 'NotFound',
- 'error': 'BLOB does not exist',
- 'solution': [{
- 'command': ['echo'],
- 'context': context,
- 'guid': impl,
- 'license': ['GPLv3+'],
- 'extract': 'topdir',
- 'stability': 'stable',
- 'version': '1',
- 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl),
- }],
- },
- events[-1])
+ self.assertEqual([
+ {'event': 'failure', 'exception': 'NotFound', 'error': 'BLOB does not exist',
+ 'stability': ['stable'],
+ 'solution': [{
+ 'command': ['echo'],
+ 'context': context,
+ 'guid': impl,
+ 'license': ['GPLv3+'],
+ 'extract': 'topdir',
+ 'stability': 'stable',
+ 'version': '1',
+ 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl),
+ }],
+ },
+ ],
+ [i for i in conn.put(['context', context], True, cmd='clone')])
assert not exists('cache/solutions/%s/%s' % (context[:2], context))
def test_clone_Content(self):
@@ -470,8 +454,10 @@ Can't find all required implementations:
self.node_volume['implementation'].update(impl, {'data': {'blob': StringIO(blob), 'foo': 'bar'}})
clone_path = 'client/context/%s/%s/.clone' % (context[:2], context)
- ipc.put(['context', context], True, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ {'event': 'ready'},
+ ],
+ [i for i in ipc.put(['context', context], True, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -520,8 +506,9 @@ Can't find all required implementations:
assert exists(clone_path + '/data.blob')
assert not exists('cache/solutions/%s/%s' % (context[:2], context))
- ipc.put(['context', context], False, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ ],
+ [i for i in ipc.put(['context', context], False, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -554,8 +541,10 @@ Can't find all required implementations:
assert not lexists(clone_path)
assert not exists('cache/solutions/%s/%s' % (context[:2], context))
- ipc.put(['context', context], True, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ {'event': 'ready'},
+ ],
+ [i for i in ipc.put(['context', context], True, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -601,11 +590,14 @@ Can't find all required implementations:
'stability': 'stable',
'version': '1',
'command': ['true'],
- 'path': blob_path,
}]
+ downloaded_solution = copy.deepcopy(solution)
+ downloaded_solution[0]['path'] = blob_path
- ipc.put(['context', 'bundle_id'], True, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ {'event': 'ready'},
+ ],
+ [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -657,11 +649,12 @@ Can't find all required implementations:
self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read())
assert exists(clone_path + '/data.blob/activity/activity.info')
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api_url.value, ['stable'], downloaded_solution],
json.load(file('cache/solutions/bu/bundle_id')))
- ipc.put(['context', 'bundle_id'], False, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ ],
+ [i for i in ipc.put(['context', 'bundle_id'], False, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -700,11 +693,13 @@ Can't find all required implementations:
self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read())
assert not exists(clone_path)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api_url.value, ['stable'], downloaded_solution],
json.load(file('cache/solutions/bu/bundle_id')))
- ipc.put(['context', 'bundle_id'], True, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual([
+ {'event': 'ready'},
+ ],
+ [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')])
self.assertEqual({
'event': 'update',
@@ -717,7 +712,7 @@ Can't find all required implementations:
sorted(ipc.get(['context'], reply='layer')['result']))
assert exists(clone_path + '/data.blob/activity/activity.info')
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api_url.value, ['stable'], downloaded_solution],
json.load(file('cache/solutions/bu/bundle_id')))
def test_clone_ActivityWithStabilityPreferences(self):
@@ -749,7 +744,10 @@ Can't find all required implementations:
blob2 = self.zips(['TestActivity/activity/activity.info', activity_info2])
impl2 = ipc.upload(['implementation'], StringIO(blob2), cmd='release', initial=True)
- ipc.put(['context', 'bundle_id'], True, cmd='clone')
+ self.assertEqual(
+ 'ready',
+ [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event'])
+
coroutine.dispatch()
self.assertEqual({'layer': ['clone']}, ipc.get(['context', 'bundle_id'], reply='layer'))
self.assertEqual([impl1], [i.guid for i in local['implementation'].find()[0]])
@@ -761,8 +759,13 @@ Can't find all required implementations:
]))
Option.load(['config'])
- ipc.put(['context', 'bundle_id'], False, cmd='clone')
- ipc.put(['context', 'bundle_id'], True, cmd='clone')
+ self.assertEqual(
+ [],
+ [i for i in ipc.put(['context', 'bundle_id'], False, cmd='clone')])
+ self.assertEqual(
+ 'ready',
+ [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event'])
+
coroutine.dispatch()
self.assertEqual({'layer': ['clone']}, ipc.get(['context', 'bundle_id'], reply='layer'))
self.assertEqual([impl1, impl2], [i.guid for i in local['implementation'].find()[0]])
@@ -803,8 +806,9 @@ Can't find all required implementations:
},
ipc.head(['context', 'bundle_id'], cmd='clone'))
- ipc.put(['context', 'bundle_id'], True, cmd='clone')
- coroutine.dispatch()
+ self.assertEqual(
+ 'ready',
+ [i for i in ipc.put(['context', 'bundle_id'], True, cmd='clone')][-1]['event'])
blob_path = tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl)
self.assertEqual({
@@ -842,16 +846,6 @@ Can't find all required implementations:
impl = ipc.upload(['implementation'], StringIO(blob), cmd='release', initial=True)
coroutine.sleep(.1)
- def read_events():
- for event in ipc.subscribe(event='!commit'):
- events.append(event)
- events = []
- coroutine.spawn(read_events)
- coroutine.dispatch()
-
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
-
solution = [{
'guid': impl,
'context': 'bundle_id',
@@ -860,17 +854,19 @@ Can't find all required implementations:
'stability': 'stable',
'version': '1',
'command': ['true'],
- 'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl),
}]
+ downloaded_solution = copy.deepcopy(solution)
+ downloaded_solution[0]['path'] = tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl)
log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'exit', 'activity_id': 'activity_id'},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api_url.value, ['stable'], downloaded_solution],
json.load(file('cache/solutions/bu/bundle_id')))
blob = self.zips(['TestActivity/activity/activity.info', [
@@ -886,10 +882,6 @@ Can't find all required implementations:
coroutine.sleep(.1)
shutil.rmtree('cache/solutions')
- del events[:]
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
-
solution = [{
'guid': impl,
'context': 'bundle_id',
@@ -902,10 +894,11 @@ Can't find all required implementations:
}]
log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_1.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'exit', 'activity_id': 'activity_id'},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
[client.api_url.value, ['stable'], solution],
@@ -914,41 +907,56 @@ Can't find all required implementations:
self.node.stop()
coroutine.sleep(.1)
- del events[:]
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
-
log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_2.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'exit', 'activity_id': 'activity_id'},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
[client.api_url.value, ['stable'], solution],
json.load(file('cache/solutions/bu/bundle_id')))
shutil.rmtree('cache/solutions')
- del events[:]
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
-
log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id_3.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'exit', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['true', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'exit', 'activity_id': 'activity_id'},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
[client.api_url.value, ['stable'], solution],
json.load(file('cache/solutions/bu/bundle_id')))
- def test_launch_ActivityFailed(self):
+ def test_launch_Fails(self):
local = self.start_online_client([User, Context, Implementation])
ipc = IPCConnection()
+ self.assertEqual([
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'NotFound', 'error': "Resource 'foo' does not exist in 'context'"},
+ ],
+ [i for i in ipc.get(['context', 'foo'], cmd='launch')])
+
+ ipc.post(['context'], {
+ 'guid': 'bundle_id',
+ 'type': 'activity',
+ 'title': 'title',
+ 'summary': 'summary',
+ 'description': 'description',
+ })
+ self.assertEqual([
+ {'event': 'launch', 'activity_id': 'activity_id', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'NotFound', 'error': """\
+Can't find all required implementations:
+- bundle_id -> (problem)
+ No known implementations at all"""},
+ ],
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
+
activity_info = '\n'.join([
'[Activity]',
'name = TestActivity',
@@ -960,17 +968,6 @@ Can't find all required implementations:
])
blob = self.zips(['TestActivity/activity/activity.info', activity_info])
impl = ipc.upload(['implementation'], StringIO(blob), cmd='release', initial=True)
- coroutine.sleep(.1)
-
- def read_events():
- for event in ipc.subscribe(event='!commit'):
- events.append(event)
- events = []
- coroutine.spawn(read_events)
- coroutine.dispatch()
-
- ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')
- coroutine.sleep(.1)
solution = [{
'guid': impl,
@@ -982,12 +979,20 @@ Can't find all required implementations:
'command': ['false'],
'path': tests.tmpdir + '/client/implementation/%s/%s/data.blob' % (impl[:2], impl),
}]
- log_path = tests.tmpdir + '/.sugar/default/logs/bundle_id.log'
self.assertEqual([
- {'event': 'exec', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
- {'event': 'failure', 'error': 'Process exited with 1 status', 'cmd': 'launch', 'guid': 'bundle_id', 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'], 'foo': 'bar', 'activity_id': 'activity_id', 'log_path': log_path, 'solution': solution},
+ {'event': 'launch', 'foo': 'bar', 'activity_id': 'activity_id'},
+ {'event': 'exec', 'activity_id': 'activity_id'},
+ {'event': 'failure', 'activity_id': 'activity_id', 'exception': 'RuntimeError', 'error': 'Process exited with 1 status',
+ 'stability': ['stable'],
+ 'args': ['false', '-b', 'bundle_id', '-a', 'activity_id'],
+ 'solution': solution,
+ 'logs': [
+ tests.tmpdir + '/.sugar/default/logs/shell.log',
+ tests.tmpdir + '/.sugar/default/logs/sugar-network-client.log',
+ tests.tmpdir + '/.sugar/default/logs/bundle_id.log',
+ ]},
],
- events)
+ [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['implementation'].exists(impl)
self.assertEqual(
[client.api_url.value, ['stable'], solution],
diff --git a/tests/units/model/routes.py b/tests/units/model/routes.py
index 04491ee..9cad606 100755
--- a/tests/units/model/routes.py
+++ b/tests/units/model/routes.py
@@ -54,11 +54,6 @@ class RoutesTest(tests.Test):
def read_events():
for event in routes.subscribe(event='!commit'):
- if not event.strip():
- continue
- assert event.startswith('data: ')
- assert event.endswith('\n\n')
- event = json.loads(event[6:])
events.append(event)
job = coroutine.spawn(read_events)
@@ -84,8 +79,7 @@ class RoutesTest(tests.Test):
routes = model.FrontRoutes()
for event in routes.subscribe(ping=True):
break
- self.assertEqual('data: {"event": "pong"}\n\n', event)
-
+ self.assertEqual({'event': 'pong'}, event)
if __name__ == '__main__':
diff --git a/tests/units/node/node.py b/tests/units/node/node.py
index e163912..3506495 100755
--- a/tests/units/node/node.py
+++ b/tests/units/node/node.py
@@ -159,7 +159,7 @@ class NodeTest(tests.Test):
def subscribe():
for event in cp.subscribe():
- events.append(json.loads(event[6:]))
+ events.append(event)
events = []
coroutine.spawn(subscribe)
coroutine.dispatch()
@@ -184,7 +184,7 @@ class NodeTest(tests.Test):
def subscribe():
for event in cp.subscribe():
- events.append(json.loads(event[6:]))
+ events.append(event)
events = []
coroutine.spawn(subscribe)
coroutine.dispatch()
diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py
index b970a90..ad90cf5 100755
--- a/tests/units/toolkit/router.py
+++ b/tests/units/toolkit/router.py
@@ -10,7 +10,7 @@ from __init__ import tests, src_root
from sugar_network import db
from sugar_network.toolkit.router import Blob, Router, Request, _parse_accept_language, route, fallbackroute, preroute, postroute, _filename
-from sugar_network.toolkit import default_lang, http
+from sugar_network.toolkit import default_lang, http, coroutine
class RouterTest(tests.Test):
@@ -1266,6 +1266,94 @@ class RouterTest(tests.Test):
],
response)
+ def test_EventStream(self):
+
+ class Routes(object):
+
+ @route('GET', mime_type='text/event-stream')
+ def get(self):
+ yield None
+ yield 0
+ yield -1
+ yield '2'
+ yield {'3': 4}
+
+ reply = Router(Routes())({
+ 'PATH_INFO': '/',
+ 'REQUEST_METHOD': 'GET',
+ },
+ lambda status, headers: None)
+
+ self.assertEqual([
+ 'data: null\n\n',
+ 'data: 0\n\n',
+ 'data: -1\n\n',
+ 'data: "2"\n\n',
+ 'data: {"3": 4}\n\n',
+ ],
+ [i for i in reply])
+
+ def test_SpawnEventStream(self):
+ events = []
+
+ class Routes(object):
+
+ @route('GET', [None, None, None], cmd='cmd', mime_type='text/event-stream')
+ def ok(self):
+ yield {}
+ yield {'foo': 'bar'}
+
+ def broadcast(self, event):
+ events.append(event.copy())
+
+ reply = Router(Routes(), allow_spawn=True)({
+ 'PATH_INFO': '/resource/guid/prop',
+ 'REQUEST_METHOD': 'GET',
+ 'QUERY_STRING': 'cmd=cmd&spawn&arg',
+ },
+ lambda status, headers: None)
+ self.assertEqual([], [i for i in reply])
+
+ coroutine.sleep(.1)
+ self.assertEqual([
+ {'method': 'GET', 'resource': 'resource', 'guid': 'guid', 'prop': 'prop', 'cmd': 'cmd'},
+ {'method': 'GET', 'resource': 'resource', 'guid': 'guid', 'prop': 'prop', 'cmd': 'cmd', 'foo': 'bar'},
+ ],
+ events)
+ del events[:]
+
+ def test_SpawnEventStreamFailure(self):
+ events = []
+
+ class Routes(object):
+
+ @route('GET', mime_type='text/event-stream')
+ def error(self, request):
+ request.session['bar'] = 'foo'
+ yield {}
+ yield {'foo': 'bar'}, {'add': 'on'}
+ raise RuntimeError('error')
+
+ def broadcast(self, event):
+ events.append(event.copy())
+
+ reply = Router(Routes(), allow_spawn=True)({
+ 'PATH_INFO': '/',
+ 'REQUEST_METHOD': 'GET',
+ 'QUERY_STRING': 'spawn',
+ },
+ lambda status, headers: None)
+ self.assertEqual([], [i for i in reply])
+
+ coroutine.sleep(.1)
+ self.assertEqual([
+ {'method': 'GET'},
+ {'method': 'GET', 'foo': 'bar', 'add': 'on'},
+ {'method': 'GET', 'bar': 'foo', 'event': 'failure', 'exception': 'RuntimeError', 'error': 'error'},
+ ],
+ events)
+ del events[:]
+
if __name__ == '__main__':
tests.main()