diff options
Diffstat (limited to 'sugar_network/client/routes.py')
-rw-r--r-- | sugar_network/client/routes.py | 119 |
1 files changed, 50 insertions, 69 deletions
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index c6ea6d2..50d8632 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -24,6 +24,7 @@ from sugar_network import db, client, node, toolkit, model from sugar_network.client import journal, releases from sugar_network.node.slave import SlaveRoutes from sugar_network.toolkit import netlink, mountpoints +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.router import ACL, Request, Response, Router from sugar_network.toolkit.router import route, fallbackroute from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce @@ -189,44 +190,38 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): yield {'event': 'done', 'guid': guid} @fallbackroute() - def fallback(self, request=None, response=None, method=None, path=None, - cmd=None, content=None, content_stream=None, content_type=None, - **kwargs): + def fallback(self, request=None, response=None, **kwargs): if request is None: - request = Request(method=method, path=path, cmd=cmd, - content=content, content_stream=content_stream, - content_type=content_type) + request = Request(**kwargs) if response is None: response = Response() - request.update(kwargs) - if self._inline.is_set(): - if client.layers.value and \ - request.resource in ('context', 'release'): - request.add('layer', *client.layers.value) - request.principal = self._auth.login - try: - reply = self._node.call(request, response) - if hasattr(reply, 'read'): - if response.relocations: - return reply - else: - return _ResponseStream(reply, self._restart_online) - else: - return reply - except (http.ConnectionError, IncompleteRead): + + if not self._inline.is_set(): + return self._local.call(request, response) + + if client.layers.value and request.resource in ('context', 'release'): + request.add('layer', *client.layers.value) + request.principal = self._auth.login + try: + reply = self._node.call(request, response) + if hasattr(reply, 'read'): if response.relocations: - raise - self._restart_online() - return self._local.call(request, response) - else: + return reply + else: + return _ResponseStream(reply, self._restart_online) + else: + return reply + except (http.ConnectionError, IncompleteRead): + if response.relocations: + raise + self._restart_online() return self._local.call(request, response) def _got_online(self): enforce(not self._inline.is_set()) _logger.debug('Got online on %r', self._node) self._inline.set() - self.broadcast({'event': 'inline', 'state': 'online'}) - self._local.volume.broadcast = None + this.localcast({'event': 'inline', 'state': 'online'}) def _got_offline(self, force=False): if not force and not self._inline.is_set(): @@ -235,9 +230,8 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): self._node.close() if self._inline.is_set(): _logger.debug('Got offline on %r', self._node) - self.broadcast({'event': 'inline', 'state': 'offline'}) + this.localcast({'event': 'inline', 'state': 'offline'}) self._inline.clear() - self._local.volume.broadcast = self.broadcast def _restart_online(self): _logger.debug('Lost %r connection, try to reconnect in %s seconds', @@ -266,16 +260,19 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): mtime = event.get('mtime') if mtime: self.invalidate_solutions(mtime) - self.broadcast(event) + this.broadcast(event) def handshake(url): _logger.debug('Connecting to %r node', url) self._node = client.Connection(url, auth=self._auth) status = self._node.get(cmd='status') self._auth.allow_basic_auth = (status.get('level') == 'master') + """ + TODO switch to seqno impl_info = status['resources'].get('release') if impl_info: self.invalidate_solutions(impl_info['mtime']) + """ if self._inline.is_set(): _logger.info('Reconnected to %r node', url) else: @@ -284,7 +281,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): def connect(): timeout = _RECONNECT_TIMEOUT while True: - self.broadcast({'event': 'inline', 'state': 'connecting'}) + this.localcast({'event': 'inline', 'state': 'connecting'}) for url in self._remote_urls: while True: try: @@ -329,8 +326,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): profile['guid'] = self._auth.login volume['user'].create(profile) - self._node = _NodeRoutes(join(db_path, 'node'), volume, - self.broadcast) + self._node = _NodeRoutes(join(db_path, 'node'), volume) self._jobs.spawn(volume.populate) logging.info('Start %r node on %s port', volume.root, node.port.value) @@ -364,6 +360,11 @@ class CachedClientRoutes(ClientRoutes): ClientRoutes._got_offline(self, force) def _push(self): + # TODO should work using regular pull/push + return + + + pushed_seq = toolkit.Sequence() skiped_seq = toolkit.Sequence() volume = self._local.volume @@ -388,24 +389,11 @@ class CachedClientRoutes(ClientRoutes): diff_seq = toolkit.Sequence() post_requests = [] for prop, meta, seqno in patch: - if 'blob' in meta: - request = Request(method='PUT', path=[res, guid, prop]) - request.content_type = meta['mime_type'] - request.content_length = os.stat(meta['blob']).st_size - request.content_stream = \ - toolkit.iter_file(meta['blob']) - post_requests.append((request, seqno)) - elif 'url' in meta: - request = Request(method='PUT', path=[res, guid, prop]) - request.content_type = 'application/json' - request.content = meta - post_requests.append((request, seqno)) - else: - value = meta['value'] - if prop == 'layer': - value = list(set(value) - _LOCAL_LAYERS) - diff[prop] = value - diff_seq.include(seqno, seqno) + value = meta['value'] + if prop == 'layer': + value = list(set(value) - _LOCAL_LAYERS) + diff[prop] = value + diff_seq.include(seqno, seqno) if not diff: continue if 'guid' in diff: @@ -426,7 +414,6 @@ class CachedClientRoutes(ClientRoutes): if not pushed_seq: if not self._push_seq.mtime: self._push_seq.commit() - self.broadcast({'event': 'push'}) return _logger.info('Pushed %r local cache', pushed_seq) @@ -441,38 +428,32 @@ class CachedClientRoutes(ClientRoutes): volume['report'].wipe() self._push_seq.commit() - self.broadcast({'event': 'push'}) -class _LocalRoutes(model.VolumeRoutes, Router): +class _LocalRoutes(db.Routes, Router): def __init__(self, volume): - model.VolumeRoutes.__init__(self, volume) + db.Routes.__init__(self, volume) Router.__init__(self, self) - def on_create(self, request, props, event): + def on_create(self, request, props): props['layer'] = tuple(props['layer']) + ('local',) - model.VolumeRoutes.on_create(self, request, props, event) + db.Routes.on_create(self, request, props) class _NodeRoutes(SlaveRoutes, Router): - def __init__(self, key_path, volume, localcast): + def __init__(self, key_path, volume): SlaveRoutes.__init__(self, key_path, volume) Router.__init__(self, self) self.api_url = 'http://127.0.0.1:%s' % node.port.value - self._localcast = localcast self._mounts = toolkit.Pool() self._jobs = coroutine.Pool() mountpoints.connect(_SYNC_DIRNAME, self.__found_mountcb, self.__lost_mount_cb) - def broadcast(self, event=None, request=None): - SlaveRoutes.broadcast(self, event, request) - self._localcast(event) - def close(self): self.volume.close() @@ -481,27 +462,27 @@ class _NodeRoutes(SlaveRoutes, Router): (self.volume.root, self.api_url) def _sync_mounts(self): - self._localcast({'event': 'sync_start'}) + this.localcast({'event': 'sync_start'}) for mountpoint in self._mounts: - self._localcast({'event': 'sync_next', 'path': mountpoint}) + this.localcast({'event': 'sync_next', 'path': mountpoint}) try: self._offline_session = self._offline_sync( join(mountpoint, _SYNC_DIRNAME), **(self._offline_session or {})) except Exception, error: _logger.exception('Failed to complete synchronization') - self._localcast({'event': 'sync_abort', 'error': str(error)}) + this.localcast({'event': 'sync_abort', 'error': str(error)}) self._offline_session = None raise if self._offline_session is None: _logger.debug('Synchronization completed') - self._localcast({'event': 'sync_complete'}) + this.localcast({'event': 'sync_complete'}) else: _logger.debug('Postpone synchronization with %r session', self._offline_session) - self._localcast({'event': 'sync_paused'}) + this.localcast({'event': 'sync_paused'}) def __found_mountcb(self, path): self._mounts.add(path) |