From 7f1bdbda6000e96901ad01badbb58c6cbdf8a73c Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Thu, 27 Sep 2012 16:49:01 +0000 Subject: Reconnect after closing subscription socket, it might be triggered by server timeouts --- diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py index 3203c3b..a08e693 100644 --- a/sugar_network/local/mounts.py +++ b/sugar_network/local/mounts.py @@ -358,6 +358,8 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): finally: _logger.info('Got disconnected from %r master', url) _Mount.set_mounted(self, False) + self._client.close() + self._client = None class NodeMount(LocalMount, _ProxyCommands): diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py index 04c5723..ef74648 100644 --- a/sugar_network/local/mountset.py +++ b/sugar_network/local/mountset.py @@ -140,6 +140,8 @@ class Mountset(dict, ad.CommandsProcessor, Commands, SyncCommands): del self._subscriptions[callback] def publish(self, event): + _logger.debug('Publish event: %r', event) + for callback, condition in self._subscriptions.items(): for key, value in condition.items(): if event.get(key) != value: diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 368cd08..69a7acb 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -18,6 +18,7 @@ import os import cgi import json +import time import shutil import logging import hashlib @@ -33,15 +34,20 @@ from sugar_network.zerosugar import Bundle from active_toolkit.sockets import decode_multipart, BUFFER_SIZE from sugar_network.toolkit import sugar from sugar_network import local -from active_toolkit import enforce +from active_toolkit import coroutine, enforce # Let toolkit.http work in concurrence from gevent import monkey # XXX No DNS because `toolkit.network.res_init()` doesn't work otherwise monkey.patch_socket(dns=False) +monkey.patch_select() monkey.patch_ssl() +monkey.patch_time() +_RECONNECTION_NUMBER = 1 +_RECONNECTION_TIMEOUT = 3 + _logger = logging.getLogger('http') @@ -67,6 +73,15 @@ class Client(object): self._session = Session(headers=headers, verify=verify, prefetch=False) + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def close(self): + self._session.close() + def get(self, path_=None, **kwargs): kwargs.update(self.params) response = self.request('GET', path_, params=kwargs) @@ -215,11 +230,34 @@ class Client(object): return mime_type def subscribe(self): - response = self._decode_response( - self.request('GET', params={'cmd': 'subscribe'})) - for line in _readlines(response.raw): - if line.startswith('data: '): - yield json.loads(line.split(' ', 1)[1]) + + def handshake(): + _logger.debug('Subscribe to %r', self.api_url) + return self.request('GET', params={'cmd': 'subscribe'}).raw + + def pull_events(stream): + retries = _RECONNECTION_NUMBER + while True: + start_time = time.time() + try: + if stream is None: + stream = handshake() + for line in _readlines(stream): + if line.startswith('data: '): + yield json.loads(line.split(' ', 1)[1]) + except Exception: + if time.time() - start_time > _RECONNECTION_TIMEOUT * 10: + retries = _RECONNECTION_NUMBER + if retries <= 0: + raise + _logger.debug('Re-subscribe to %r in %s second(s)', + self.api_url, _RECONNECTION_TIMEOUT) + self.close() + coroutine.sleep(_RECONNECTION_TIMEOUT) + retries -= 1 + stream = None + + return pull_events(handshake()) def _register(self): self.request('POST', ['user'], diff --git a/tests/__init__.py b/tests/__init__.py index 2ec7f1d..9d7359a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -89,6 +89,7 @@ class Test(unittest.TestCase): stats.stats_rras.value = ['RRA:AVERAGE:0.5:1:100'] stats._cache.clear() obs._client = None + http._RECONNECTION_NUMBER = 0 Volume.RESOURCES = [ 'sugar_network.resources.user', -- cgit v0.9.1