Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/client/routes.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/client/routes.py')
-rw-r--r--sugar_network/client/routes.py119
1 files changed, 50 insertions, 69 deletions
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py
index c6ea6d2..50d8632 100644
--- a/sugar_network/client/routes.py
+++ b/sugar_network/client/routes.py
@@ -24,6 +24,7 @@ from sugar_network import db, client, node, toolkit, model
from sugar_network.client import journal, releases
from sugar_network.node.slave import SlaveRoutes
from sugar_network.toolkit import netlink, mountpoints
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit.router import ACL, Request, Response, Router
from sugar_network.toolkit.router import route, fallbackroute
from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce
@@ -189,44 +190,38 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
yield {'event': 'done', 'guid': guid}
@fallbackroute()
- def fallback(self, request=None, response=None, method=None, path=None,
- cmd=None, content=None, content_stream=None, content_type=None,
- **kwargs):
+ def fallback(self, request=None, response=None, **kwargs):
if request is None:
- request = Request(method=method, path=path, cmd=cmd,
- content=content, content_stream=content_stream,
- content_type=content_type)
+ request = Request(**kwargs)
if response is None:
response = Response()
- request.update(kwargs)
- if self._inline.is_set():
- if client.layers.value and \
- request.resource in ('context', 'release'):
- request.add('layer', *client.layers.value)
- request.principal = self._auth.login
- try:
- reply = self._node.call(request, response)
- if hasattr(reply, 'read'):
- if response.relocations:
- return reply
- else:
- return _ResponseStream(reply, self._restart_online)
- else:
- return reply
- except (http.ConnectionError, IncompleteRead):
+
+ if not self._inline.is_set():
+ return self._local.call(request, response)
+
+ if client.layers.value and request.resource in ('context', 'release'):
+ request.add('layer', *client.layers.value)
+ request.principal = self._auth.login
+ try:
+ reply = self._node.call(request, response)
+ if hasattr(reply, 'read'):
if response.relocations:
- raise
- self._restart_online()
- return self._local.call(request, response)
- else:
+ return reply
+ else:
+ return _ResponseStream(reply, self._restart_online)
+ else:
+ return reply
+ except (http.ConnectionError, IncompleteRead):
+ if response.relocations:
+ raise
+ self._restart_online()
return self._local.call(request, response)
def _got_online(self):
enforce(not self._inline.is_set())
_logger.debug('Got online on %r', self._node)
self._inline.set()
- self.broadcast({'event': 'inline', 'state': 'online'})
- self._local.volume.broadcast = None
+ this.localcast({'event': 'inline', 'state': 'online'})
def _got_offline(self, force=False):
if not force and not self._inline.is_set():
@@ -235,9 +230,8 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
self._node.close()
if self._inline.is_set():
_logger.debug('Got offline on %r', self._node)
- self.broadcast({'event': 'inline', 'state': 'offline'})
+ this.localcast({'event': 'inline', 'state': 'offline'})
self._inline.clear()
- self._local.volume.broadcast = self.broadcast
def _restart_online(self):
_logger.debug('Lost %r connection, try to reconnect in %s seconds',
@@ -266,16 +260,19 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
mtime = event.get('mtime')
if mtime:
self.invalidate_solutions(mtime)
- self.broadcast(event)
+ this.broadcast(event)
def handshake(url):
_logger.debug('Connecting to %r node', url)
self._node = client.Connection(url, auth=self._auth)
status = self._node.get(cmd='status')
self._auth.allow_basic_auth = (status.get('level') == 'master')
+ """
+ TODO switch to seqno
impl_info = status['resources'].get('release')
if impl_info:
self.invalidate_solutions(impl_info['mtime'])
+ """
if self._inline.is_set():
_logger.info('Reconnected to %r node', url)
else:
@@ -284,7 +281,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
def connect():
timeout = _RECONNECT_TIMEOUT
while True:
- self.broadcast({'event': 'inline', 'state': 'connecting'})
+ this.localcast({'event': 'inline', 'state': 'connecting'})
for url in self._remote_urls:
while True:
try:
@@ -329,8 +326,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
profile['guid'] = self._auth.login
volume['user'].create(profile)
- self._node = _NodeRoutes(join(db_path, 'node'), volume,
- self.broadcast)
+ self._node = _NodeRoutes(join(db_path, 'node'), volume)
self._jobs.spawn(volume.populate)
logging.info('Start %r node on %s port', volume.root, node.port.value)
@@ -364,6 +360,11 @@ class CachedClientRoutes(ClientRoutes):
ClientRoutes._got_offline(self, force)
def _push(self):
+ # TODO should work using regular pull/push
+ return
+
+
+
pushed_seq = toolkit.Sequence()
skiped_seq = toolkit.Sequence()
volume = self._local.volume
@@ -388,24 +389,11 @@ class CachedClientRoutes(ClientRoutes):
diff_seq = toolkit.Sequence()
post_requests = []
for prop, meta, seqno in patch:
- if 'blob' in meta:
- request = Request(method='PUT', path=[res, guid, prop])
- request.content_type = meta['mime_type']
- request.content_length = os.stat(meta['blob']).st_size
- request.content_stream = \
- toolkit.iter_file(meta['blob'])
- post_requests.append((request, seqno))
- elif 'url' in meta:
- request = Request(method='PUT', path=[res, guid, prop])
- request.content_type = 'application/json'
- request.content = meta
- post_requests.append((request, seqno))
- else:
- value = meta['value']
- if prop == 'layer':
- value = list(set(value) - _LOCAL_LAYERS)
- diff[prop] = value
- diff_seq.include(seqno, seqno)
+ value = meta['value']
+ if prop == 'layer':
+ value = list(set(value) - _LOCAL_LAYERS)
+ diff[prop] = value
+ diff_seq.include(seqno, seqno)
if not diff:
continue
if 'guid' in diff:
@@ -426,7 +414,6 @@ class CachedClientRoutes(ClientRoutes):
if not pushed_seq:
if not self._push_seq.mtime:
self._push_seq.commit()
- self.broadcast({'event': 'push'})
return
_logger.info('Pushed %r local cache', pushed_seq)
@@ -441,38 +428,32 @@ class CachedClientRoutes(ClientRoutes):
volume['report'].wipe()
self._push_seq.commit()
- self.broadcast({'event': 'push'})
-class _LocalRoutes(model.VolumeRoutes, Router):
+class _LocalRoutes(db.Routes, Router):
def __init__(self, volume):
- model.VolumeRoutes.__init__(self, volume)
+ db.Routes.__init__(self, volume)
Router.__init__(self, self)
- def on_create(self, request, props, event):
+ def on_create(self, request, props):
props['layer'] = tuple(props['layer']) + ('local',)
- model.VolumeRoutes.on_create(self, request, props, event)
+ db.Routes.on_create(self, request, props)
class _NodeRoutes(SlaveRoutes, Router):
- def __init__(self, key_path, volume, localcast):
+ def __init__(self, key_path, volume):
SlaveRoutes.__init__(self, key_path, volume)
Router.__init__(self, self)
self.api_url = 'http://127.0.0.1:%s' % node.port.value
- self._localcast = localcast
self._mounts = toolkit.Pool()
self._jobs = coroutine.Pool()
mountpoints.connect(_SYNC_DIRNAME,
self.__found_mountcb, self.__lost_mount_cb)
- def broadcast(self, event=None, request=None):
- SlaveRoutes.broadcast(self, event, request)
- self._localcast(event)
-
def close(self):
self.volume.close()
@@ -481,27 +462,27 @@ class _NodeRoutes(SlaveRoutes, Router):
(self.volume.root, self.api_url)
def _sync_mounts(self):
- self._localcast({'event': 'sync_start'})
+ this.localcast({'event': 'sync_start'})
for mountpoint in self._mounts:
- self._localcast({'event': 'sync_next', 'path': mountpoint})
+ this.localcast({'event': 'sync_next', 'path': mountpoint})
try:
self._offline_session = self._offline_sync(
join(mountpoint, _SYNC_DIRNAME),
**(self._offline_session or {}))
except Exception, error:
_logger.exception('Failed to complete synchronization')
- self._localcast({'event': 'sync_abort', 'error': str(error)})
+ this.localcast({'event': 'sync_abort', 'error': str(error)})
self._offline_session = None
raise
if self._offline_session is None:
_logger.debug('Synchronization completed')
- self._localcast({'event': 'sync_complete'})
+ this.localcast({'event': 'sync_complete'})
else:
_logger.debug('Postpone synchronization with %r session',
self._offline_session)
- self._localcast({'event': 'sync_paused'})
+ this.localcast({'event': 'sync_paused'})
def __found_mountcb(self, path):
self._mounts.add(path)