Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/toolkit/http.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/toolkit/http.py')
-rw-r--r--sugar_network/toolkit/http.py50
1 files changed, 44 insertions, 6 deletions
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 368cd08..69a7acb 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -18,6 +18,7 @@
import os
import cgi
import json
+import time
import shutil
import logging
import hashlib
@@ -33,15 +34,20 @@ from sugar_network.zerosugar import Bundle
from active_toolkit.sockets import decode_multipart, BUFFER_SIZE
from sugar_network.toolkit import sugar
from sugar_network import local
-from active_toolkit import enforce
+from active_toolkit import coroutine, enforce
# Let toolkit.http work in concurrence
from gevent import monkey
# XXX No DNS because `toolkit.network.res_init()` doesn't work otherwise
monkey.patch_socket(dns=False)
+monkey.patch_select()
monkey.patch_ssl()
+monkey.patch_time()
+_RECONNECTION_NUMBER = 1
+_RECONNECTION_TIMEOUT = 3
+
_logger = logging.getLogger('http')
@@ -67,6 +73,15 @@ class Client(object):
self._session = Session(headers=headers, verify=verify, prefetch=False)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def close(self):
+ self._session.close()
+
def get(self, path_=None, **kwargs):
kwargs.update(self.params)
response = self.request('GET', path_, params=kwargs)
@@ -215,11 +230,34 @@ class Client(object):
return mime_type
def subscribe(self):
- response = self._decode_response(
- self.request('GET', params={'cmd': 'subscribe'}))
- for line in _readlines(response.raw):
- if line.startswith('data: '):
- yield json.loads(line.split(' ', 1)[1])
+
+ def handshake():
+ _logger.debug('Subscribe to %r', self.api_url)
+ return self.request('GET', params={'cmd': 'subscribe'}).raw
+
+ def pull_events(stream):
+ retries = _RECONNECTION_NUMBER
+ while True:
+ start_time = time.time()
+ try:
+ if stream is None:
+ stream = handshake()
+ for line in _readlines(stream):
+ if line.startswith('data: '):
+ yield json.loads(line.split(' ', 1)[1])
+ except Exception:
+ if time.time() - start_time > _RECONNECTION_TIMEOUT * 10:
+ retries = _RECONNECTION_NUMBER
+ if retries <= 0:
+ raise
+ _logger.debug('Re-subscribe to %r in %s second(s)',
+ self.api_url, _RECONNECTION_TIMEOUT)
+ self.close()
+ coroutine.sleep(_RECONNECTION_TIMEOUT)
+ retries -= 1
+ stream = None
+
+ return pull_events(handshake())
def _register(self):
self.request('POST', ['user'],