From e36b897cd6cf7e52370479127aa85218f6269b51 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Mon, 11 Nov 2013 01:04:56 +0000 Subject: Close event stream connections on client exists --- diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index 6abb758..1701c69 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -87,7 +87,7 @@ class FrontRoutes(object): if response is not None: response.content_type = 'text/event-stream' response['Cache-Control'] = 'no-cache' - return self._pull_events(ping, condition) + return self._pull_events(request, ping, condition) @route('POST', cmd='broadcast', mime_type='application/json', acl=ACL.LOCAL) @@ -120,7 +120,9 @@ class FrontRoutes(object): 'mime_type': 'image/x-icon', }) - def _pull_events(self, ping, condition): + def _pull_events(self, request, ping, condition): + _logger.debug('Start subscription, total=%s', self._pooler.waiters + 1) + if ping: # XXX The whole commands' kwargs handling should be redesigned if 'ping' in condition: @@ -131,8 +133,18 @@ class FrontRoutes(object): # `GET /?cmd=subscribe` call. yield {'event': 'pong'} + rfile = None + if request is not None: + rfile = request.content_stream + coroutine.spawn(self._waiter_for_closing, rfile) + while True: event = self._pooler.wait() + if not isinstance(event, dict): + if event is rfile: + break + else: + continue for key, value in condition.items(): if value.startswith('!'): if event.get(key) == value[1:]: @@ -142,6 +154,14 @@ class FrontRoutes(object): else: yield event + _logger.debug('Stop subscription, total=%s', self._pooler.waiters) + + def _waiter_for_closing(self, rfile): + try: + coroutine.select([rfile.fileno()], [], []) + finally: + self._pooler.notify_all(rfile) + class _Pooler(object): """One-producer-to-many-consumers events delivery.""" @@ -153,6 +173,10 @@ class _Pooler(object): self._open = coroutine.Event() self._open.set() + @property + def waiters(self): + return self._waiters + def wait(self): self._open.wait() self._waiters += 1 diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 9dd11cd..885bca9 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -721,6 +721,9 @@ class _ContentStream(object): self._length = length self._pos = 0 + def fileno(self): + return self._stream.rfile.fileno() + def read(self, size=None): if self._length: the_rest = max(0, self._length - self._pos) diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py index bfd2330..7103584 100755 --- a/tests/units/client/routes.py +++ b/tests/units/client/routes.py @@ -193,6 +193,7 @@ class RoutesTest(tests.Test): trigger.wait() self.node_volume.close() + coroutine.sleep(1.1) volume['context'].update(guid1, {'title': 'title_'}) volume['context'].delete(guid2) -- cgit v0.9.1