Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-11-11 01:04:56 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-11-11 01:04:56 (GMT)
commite36b897cd6cf7e52370479127aa85218f6269b51 (patch)
treee3eadde9b13289cb74494ff093b883292b3cda14
parent0e1fdae2ede40b64027f8c68bd6a562e3b217f67 (diff)
Close event stream connections on client exists
-rw-r--r--sugar_network/model/routes.py28
-rw-r--r--sugar_network/toolkit/router.py3
-rwxr-xr-xtests/units/client/routes.py1
3 files changed, 30 insertions, 2 deletions
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index 6abb758..1701c69 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -87,7 +87,7 @@ class FrontRoutes(object):
if response is not None:
response.content_type = 'text/event-stream'
response['Cache-Control'] = 'no-cache'
- return self._pull_events(ping, condition)
+ return self._pull_events(request, ping, condition)
@route('POST', cmd='broadcast',
mime_type='application/json', acl=ACL.LOCAL)
@@ -120,7 +120,9 @@ class FrontRoutes(object):
'mime_type': 'image/x-icon',
})
- def _pull_events(self, ping, condition):
+ def _pull_events(self, request, ping, condition):
+ _logger.debug('Start subscription, total=%s', self._pooler.waiters + 1)
+
if ping:
# XXX The whole commands' kwargs handling should be redesigned
if 'ping' in condition:
@@ -131,8 +133,18 @@ class FrontRoutes(object):
# `GET /?cmd=subscribe` call.
yield {'event': 'pong'}
+ rfile = None
+ if request is not None:
+ rfile = request.content_stream
+ coroutine.spawn(self._waiter_for_closing, rfile)
+
while True:
event = self._pooler.wait()
+ if not isinstance(event, dict):
+ if event is rfile:
+ break
+ else:
+ continue
for key, value in condition.items():
if value.startswith('!'):
if event.get(key) == value[1:]:
@@ -142,6 +154,14 @@ class FrontRoutes(object):
else:
yield event
+ _logger.debug('Stop subscription, total=%s', self._pooler.waiters)
+
+ def _waiter_for_closing(self, rfile):
+ try:
+ coroutine.select([rfile.fileno()], [], [])
+ finally:
+ self._pooler.notify_all(rfile)
+
class _Pooler(object):
"""One-producer-to-many-consumers events delivery."""
@@ -153,6 +173,10 @@ class _Pooler(object):
self._open = coroutine.Event()
self._open.set()
+ @property
+ def waiters(self):
+ return self._waiters
+
def wait(self):
self._open.wait()
self._waiters += 1
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 9dd11cd..885bca9 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -721,6 +721,9 @@ class _ContentStream(object):
self._length = length
self._pos = 0
+ def fileno(self):
+ return self._stream.rfile.fileno()
+
def read(self, size=None):
if self._length:
the_rest = max(0, self._length - self._pos)
diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py
index bfd2330..7103584 100755
--- a/tests/units/client/routes.py
+++ b/tests/units/client/routes.py
@@ -193,6 +193,7 @@ class RoutesTest(tests.Test):
trigger.wait()
self.node_volume.close()
+ coroutine.sleep(1.1)
volume['context'].update(guid1, {'title': 'title_'})
volume['context'].delete(guid2)