# Copyright (C) 2012-2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
import logging
from base64 import b64encode
from httplib import IncompleteRead
from zipfile import ZipFile, ZIP_DEFLATED
from os.path import join, basename
from sugar_network import db, client, node, toolkit, model
from sugar_network.client import journal, implementations
from sugar_network.node.slave import SlaveRoutes
from sugar_network.toolkit import netlink, mountpoints
from sugar_network.toolkit.router import ACL, Request, Response, Router
from sugar_network.toolkit.router import route, fallbackroute
from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce
# Top-level directory name to keep SN data on mounted devices
_SN_DIRNAME = 'sugar-network'
# Flag file to recognize a directory as a synchronization directory
_SYNC_DIRNAME = 'sugar-network-sync'
_RECONNECT_TIMEOUT = 3
_RECONNECT_TIMEOUT_MAX = 60 * 15
_LOCAL_LAYERS = frozenset(['local', 'clone', 'favorite'])
_logger = logging.getLogger('client.routes')
class ClientRoutes(model.FrontRoutes, implementations.Routes, journal.Routes):
def __init__(self, home_volume, api_url=None, no_subscription=False):
model.FrontRoutes.__init__(self)
implementations.Routes.__init__(self, home_volume)
journal.Routes.__init__(self)
self._local = _LocalRoutes(home_volume)
self._inline = coroutine.Event()
self._inline_job = coroutine.Pool()
self._remote_urls = []
self._node = None
self._jobs = coroutine.Pool()
self._no_subscription = no_subscription
self._server_mode = not api_url
self._api_url = api_url
self._auth = _Auth()
if not client.delayed_start.value:
self.connect()
def connect(self):
self._got_offline(force=True)
if self._server_mode:
enforce(not client.login.value)
mountpoints.connect(_SN_DIRNAME,
self._found_mount, self._lost_mount)
else:
if client.discover_server.value:
self._jobs.spawn(self._discover_node)
else:
self._remote_urls.append(self._api_url)
self._jobs.spawn(self._wait_for_connectivity)
def close(self):
self._jobs.kill()
self._got_offline()
self._local.volume.close()
@fallbackroute('GET', ['hub'])
def hub(self, request, response):
"""Serve Hub via HTTP instead of file:// for IPC users.
Since SSE doesn't support CORS for now.
"""
if request.environ['PATH_INFO'] == '/hub':
raise http.Redirect('/hub/')
path = request.path[1:]
if not path:
path = ['index.html']
path = join(client.hub_root.value, *path)
mtime = int(os.stat(path).st_mtime)
if request.if_modified_since >= mtime:
raise http.NotModified()
if path.endswith('.js'):
response.content_type = 'text/javascript'
if path.endswith('.css'):
response.content_type = 'text/css'
response.last_modified = mtime
return file(path, 'rb')
@fallbackroute('GET', ['packages'])
def route_packages(self, request, response):
if self._inline.is_set():
return self.fallback(request, response)
else:
# Let caller know that we are in offline and
# no way to process specified request on the node
raise http.ServiceUnavailable()
@route('GET', cmd='inline',
mime_type='application/json')
def inline(self):
return self._inline.is_set()
@route('GET', cmd='whoami', mime_type='application/json')
def whoami(self, request, response):
if self._inline.is_set():
result = self.fallback(request, response)
result['route'] = 'proxy'
else:
result = {'roles': [], 'route': 'offline'}
result['guid'] = self._auth.login
return result
@route('GET', [None],
arguments={
'offset': int,
'limit': int,
'reply': ('guid',),
'layer': list,
},
mime_type='application/json')
def find(self, request, response, layer):
if set(request.get('layer', [])) & set(['favorite', 'clone']):
return self._local.call(request, response)
reply = request.setdefault('reply', ['guid'])
if 'layer' not in reply:
return self.fallback(request, response)
if 'guid' not in reply:
# Otherwise there is no way to mixin local `layer`
reply.append('guid')
result = self.fallback(request, response)
directory = self._local.volume[request.resource]
for item in result['result']:
if directory.exists(item['guid']):
existing_layer = directory.get(item['guid'])['layer']
item['layer'][:] = set(item['layer']) | set(existing_layer)
return result
@route('GET', [None, None], mime_type='application/json')
def get(self, request, response):
if self._local.volume[request.resource].exists(request.guid):
return self._local.call(request, response)
else:
return self.fallback(request, response)
@route('GET', [None, None, None], mime_type='application/json')
def get_prop(self, request, response):
if self._local.volume[request.resource].exists(request.guid):
return self._local.call(request, response)
else:
return self.fallback(request, response)
@route('POST', ['report'], cmd='submit', mime_type='text/event-stream')
def submit_report(self, request, response):
logs = request.content.pop('logs')
guid = self.fallback(method='POST', path=['report'],
content=request.content, content_type='application/json')
if logs:
with toolkit.TemporaryFile() as tmpfile:
with ZipFile(tmpfile, 'w', ZIP_DEFLATED) as zipfile:
for path in logs:
zipfile.write(path, basename(path))
tmpfile.seek(0)
self.fallback(method='PUT', path=['report', guid, 'data'],
content_stream=tmpfile, content_type='application/zip')
yield {'event': 'done', 'guid': guid}
@fallbackroute()
def fallback(self, request=None, response=None, method=None, path=None,
cmd=None, content=None, content_stream=None, content_type=None,
**kwargs):
if request is None:
request = Request(method=method, path=path, cmd=cmd,
content=content, content_stream=content_stream,
content_type=content_type)
if response is None:
response = Response()
request.update(kwargs)
if self._inline.is_set():
if client.layers.value and \
request.resource in ('context', 'implementation'):
request.add('layer', *client.layers.value)
request.principal = self._auth.login
try:
reply = self._node.call(request, response)
if hasattr(reply, 'read'):
if response.relocations:
return reply
else:
return _ResponseStream(reply, self._restart_online)
else:
return reply
except (http.ConnectionError, IncompleteRead):
if response.relocations:
raise
self._restart_online()
return self._local.call(request, response)
else:
return self._local.call(request, response)
def _got_online(self):
enforce(not self._inline.is_set())
_logger.debug('Got online on %r', self._node)
self._inline.set()
self.broadcast({'event': 'inline', 'state': 'online'})
self._local.volume.broadcast = None
def _got_offline(self, force=False):
if not force and not self._inline.is_set():
return
if self._node is not None:
self._node.close()
if self._inline.is_set():
_logger.debug('Got offline on %r', self._node)
self.broadcast({'event': 'inline', 'state': 'offline'})
self._inline.clear()
self._local.volume.broadcast = self.broadcast
def _restart_online(self):
_logger.debug('Lost %r connection, try to reconnect in %s seconds',
self._node, _RECONNECT_TIMEOUT)
self._remote_connect(_RECONNECT_TIMEOUT)
def _discover_node(self):
for host in zeroconf.browse_workstations():
url = 'http://%s:%s' % (host, node.port.default)
if url not in self._remote_urls:
self._remote_urls.append(url)
self._remote_connect()
def _wait_for_connectivity(self):
for gw in netlink.wait_for_route():
if gw:
self._remote_connect()
else:
self._got_offline()
def _remote_connect(self, timeout=0):
def pull_events():
for event in self._node.subscribe():
if event.get('resource') == 'implementation':
mtime = event.get('mtime')
if mtime:
self.invalidate_solutions(mtime)
self.broadcast(event)
def handshake(url):
_logger.debug('Connecting to %r node', url)
self._node = client.Connection(url, auth=self._auth)
status = self._node.get(cmd='status')
self._auth.allow_basic_auth = (status.get('level') == 'master')
impl_info = status['resources'].get('implementation')
if impl_info:
self.invalidate_solutions(impl_info['mtime'])
if self._inline.is_set():
_logger.info('Reconnected to %r node', url)
else:
self._got_online()
def connect():
timeout = _RECONNECT_TIMEOUT
while True:
self.broadcast({'event': 'inline', 'state': 'connecting'})
for url in self._remote_urls:
while True:
try:
handshake(url)
timeout = _RECONNECT_TIMEOUT
if self._no_subscription:
return
pull_events()
except (http.BadGateway, http.GatewayTimeout):
_logger.debug('Retry %r on gateway error', url)
continue
except Exception:
exception(_logger, 'Connection to %r failed', url)
break
self._got_offline()
if not timeout:
break
_logger.debug('Try to reconect in %s seconds', timeout)
coroutine.sleep(timeout)
timeout *= _RECONNECT_TIMEOUT
timeout = min(timeout, _RECONNECT_TIMEOUT_MAX)
self._inline_job.kill()
self._inline_job.spawn_later(timeout, connect)
def _found_mount(self, root):
if self._inline.is_set():
_logger.debug('Found %r node mount but %r is already active',
root, self._node.volume.root)
return
_logger.debug('Found %r node mount', root)
db_path = join(root, _SN_DIRNAME, 'db')
node.data_root.value = db_path
node.stats_root.value = join(root, _SN_DIRNAME, 'stats')
node.files_root.value = join(root, _SN_DIRNAME, 'files')
volume = db.Volume(db_path, model.RESOURCES)
if not volume['user'].exists(self._auth.login):
profile = self._auth.profile()
profile['guid'] = self._auth.login
volume['user'].create(profile)
self._node = _NodeRoutes(join(db_path, 'node'), volume,
self.broadcast)
self._jobs.spawn(volume.populate)
logging.info('Start %r node on %s port', volume.root, node.port.value)
server = coroutine.WSGIServer(('0.0.0.0', node.port.value), self._node)
self._inline_job.spawn(server.serve_forever)
self._got_online()
def _lost_mount(self, root):
if not self._inline.is_set() or \
not self._node.volume.root.startswith(root):
return
_logger.debug('Lost %r node mount', root)
self._inline_job.kill()
self._got_offline()
class CachedClientRoutes(ClientRoutes):
def __init__(self, home_volume, api_url=None, no_subscription=False):
self._push_seq = toolkit.PersistentSequence(
join(home_volume.root, 'push.sequence'), [1, None])
self._push_job = coroutine.Pool()
ClientRoutes.__init__(self, home_volume, api_url, no_subscription)
def _got_online(self):
ClientRoutes._got_online(self)
self._push_job.spawn(self._push)
def _got_offline(self, force=True):
self._push_job.kill()
ClientRoutes._got_offline(self, force)
def _push(self):
pushed_seq = toolkit.Sequence()
skiped_seq = toolkit.Sequence()
volume = self._local.volume
def push(request, seq):
try:
self.fallback(request)
except Exception:
_logger.exception('Cannot push %r, will postpone', request)
skiped_seq.include(seq)
else:
pushed_seq.include(seq)
for res in volume.resources:
if volume.mtime(res) <= self._push_seq.mtime:
continue
_logger.debug('Check %r local cache to push', res)
for guid, patch in volume[res].diff(self._push_seq, layer='local'):
diff = {}
diff_seq = toolkit.Sequence()
post_requests = []
for prop, meta, seqno in patch:
if 'blob' in meta:
request = Request(method='PUT', path=[res, guid, prop])
request.content_type = meta['mime_type']
request.content_length = os.stat(meta['blob']).st_size
request.content_stream = \
toolkit.iter_file(meta['blob'])
post_requests.append((request, seqno))
elif 'url' in meta:
request = Request(method='PUT', path=[res, guid, prop])
request.content_type = 'application/json'
request.content = meta
post_requests.append((request, seqno))
else:
value = meta['value']
if prop == 'layer':
value = list(set(value) - _LOCAL_LAYERS)
diff[prop] = value
diff_seq.include(seqno, seqno)
if not diff:
continue
if 'guid' in diff:
request = Request(method='POST', path=[res])
access = ACL.CREATE | ACL.WRITE
else:
request = Request(method='PUT', path=[res, guid])
access = ACL.WRITE
for name in diff.keys():
if not (volume[res].metadata[name].acl & access):
del diff[name]
request.content_type = 'application/json'
request.content = diff
push(request, diff_seq)
for request, seqno in post_requests:
push(request, [[seqno, seqno]])
if not pushed_seq:
if not self._push_seq.mtime:
self._push_seq.commit()
self.broadcast({'event': 'push'})
return
_logger.info('Pushed %r local cache', pushed_seq)
self._push_seq.exclude(pushed_seq)
if not skiped_seq:
self._push_seq.stretch()
if 'report' in volume:
# No any decent reasons to keep fail reports after uploding.
# TODO The entire offlile synchronization should be improved,
# for now, it is possible to have a race here
volume['report'].wipe()
self._push_seq.commit()
self.broadcast({'event': 'push'})
class _LocalRoutes(model.VolumeRoutes, Router):
def __init__(self, volume):
model.VolumeRoutes.__init__(self, volume)
Router.__init__(self, self)
def on_create(self, request, props, event):
props['layer'] = tuple(props['layer']) + ('local',)
model.VolumeRoutes.on_create(self, request, props, event)
class _NodeRoutes(SlaveRoutes, Router):
def __init__(self, key_path, volume, localcast):
SlaveRoutes.__init__(self, key_path, volume)
Router.__init__(self, self)
self.api_url = 'http://127.0.0.1:%s' % node.port.value
self._localcast = localcast
self._mounts = toolkit.Pool()
self._jobs = coroutine.Pool()
mountpoints.connect(_SYNC_DIRNAME,
self.__found_mountcb, self.__lost_mount_cb)
def broadcast(self, event=None, request=None):
SlaveRoutes.broadcast(self, event, request)
self._localcast(event)
def close(self):
self.volume.close()
def __repr__(self):
return '' % \
(self.volume.root, self.api_url)
def _sync_mounts(self):
self._localcast({'event': 'sync_start'})
for mountpoint in self._mounts:
self._localcast({'event': 'sync_next', 'path': mountpoint})
try:
self._offline_session = self._offline_sync(
join(mountpoint, _SYNC_DIRNAME),
**(self._offline_session or {}))
except Exception, error:
_logger.exception('Failed to complete synchronization')
self._localcast({'event': 'sync_abort', 'error': str(error)})
self._offline_session = None
raise
if self._offline_session is None:
_logger.debug('Synchronization completed')
self._localcast({'event': 'sync_complete'})
else:
_logger.debug('Postpone synchronization with %r session',
self._offline_session)
self._localcast({'event': 'sync_paused'})
def __found_mountcb(self, path):
self._mounts.add(path)
if self._jobs:
_logger.debug('Found %r sync mount, pool it', path)
else:
_logger.debug('Found %r sync mount, start synchronization', path)
self._jobs.spawn(self._sync_mounts)
def __lost_mount_cb(self, path):
if self._mounts.remove(path) == toolkit.Pool.ACTIVE:
_logger.warning('%r was unmounted, break synchronization', path)
self._jobs.kill()
class _ResponseStream(object):
def __init__(self, stream, on_fail_cb):
self._stream = stream
self._on_fail_cb = on_fail_cb
def __hasattr__(self, name):
return hasattr(self._stream, name)
def __getattr__(self, name):
return getattr(self._stream, name)
def read(self, size=None):
try:
return self._stream.read(size)
except (http.ConnectionError, IncompleteRead):
self._on_fail_cb()
raise
class _Auth(http.SugarAuth):
def __init__(self):
http.SugarAuth.__init__(self, client.keyfile.value)
if client.login.value:
self._login = client.login.value
self.allow_basic_auth = False
def profile(self):
if self.allow_basic_auth and \
client.login.value and client.password.value:
return None
import gconf
conf = gconf.client_get_default()
self._profile['name'] = conf.get_string('/desktop/sugar/user/nick')
return http.SugarAuth.profile(self)
def __call__(self, nonce):
if not self.allow_basic_auth or \
not client.login.value or not client.password.value:
return http.SugarAuth.__call__(self, nonce)
auth = b64encode('%s:%s' % (client.login.value, client.password.value))
return 'Basic %s' % auth