diff options
Diffstat (limited to 'sugar_network/toolkit/router.py')
-rw-r--r-- | sugar_network/toolkit/router.py | 55 |
1 files changed, 49 insertions, 6 deletions
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 189556f..b7ba542 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -101,13 +101,13 @@ class Request(dict): subcall = lambda *args: enforce(False) def __init__(self, environ=None, method=None, path=None, cmd=None, - content=None, content_type=None, **kwargs): + content=None, content_type=None, session=None, **kwargs): dict.__init__(self) self.path = [] self.cmd = None self.environ = {} - self.session = {} + self.session = session or {} self._content = _NOT_SET self._dirty_query = False @@ -378,12 +378,13 @@ class Blob(dict): class Router(object): - def __init__(self, routes_model): + def __init__(self, routes_model, allow_spawn=False): + self._routes_model = routes_model + self._allow_spawn = allow_spawn self._valid_origins = set() self._invalid_origins = set() self._host = None self._routes = _Routes() - self._routes_model = routes_model self._preroutes = set() self._postroutes = set() @@ -534,8 +535,12 @@ class Router(object): start_response(response.status, response.items()) if result_streamed: - for i in result: - yield i + if response.content_type == 'text/event-stream': + for event in _event_stream(request, result): + yield 'data: %s\n\n' % json.dumps(event) + else: + for i in result: + yield i elif result is not None: yield result @@ -571,6 +576,11 @@ class Router(object): exception = None try: result = route_.callback(**kwargs) + if route_.mime_type == 'text/event-stream' and \ + self._allow_spawn and 'spawn' in request: + _logger.debug('Spawn event stream for %r', request) + coroutine.spawn(self._event_stream, request, result) + result = None except Exception, exception: raise else: @@ -618,6 +628,20 @@ class Router(object): raise http.NotFound('Path not found') return route_ + def _event_stream(self, request, stream): + commons = {'method': request.method} + if request.cmd: + commons['cmd'] = request.cmd + if request.resource: + commons['resource'] = request.resource + if request.guid: + commons['guid'] = request.guid + if request.prop: + commons['prop'] = request.prop + for event in _event_stream(request, stream): + event.update(commons) + self._routes_model.broadcast(event) + def _assert_origin(self, environ): origin = environ['HTTP_ORIGIN'] if origin in self._valid_origins: @@ -691,6 +715,25 @@ def _stream_reader(stream): stream.close() +def _event_stream(request, stream): + try: + for event in stream: + if type(event) is tuple: + for i in event[1:]: + event[0].update(i) + event = event[0] + yield event + except Exception, error: + _logger.exception('Event stream %r failed', request) + event = {'event': 'failure', + 'exception': type(error).__name__, + 'error': str(error), + } + event.update(request.session) + yield event + _logger.debug('Event stream %r exited', request) + + def _typecast(cast, value): if cast is list or cast is tuple: if isinstance(value, basestring): |