diff options
Diffstat (limited to 'sugar_network/client')
-rw-r--r-- | sugar_network/client/cache.py | 46 | ||||
-rw-r--r-- | sugar_network/client/journal.py | 10 | ||||
-rw-r--r-- | sugar_network/client/releases.py | 167 | ||||
-rw-r--r-- | sugar_network/client/routes.py | 119 | ||||
-rw-r--r-- | sugar_network/client/solver.py | 33 |
5 files changed, 182 insertions, 193 deletions
diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py index e13ccb6..df76a29 100644 --- a/sugar_network/client/cache.py +++ b/sugar_network/client/cache.py @@ -17,9 +17,10 @@ import os import sys import time import logging -from os.path import exists, basename +from os.path import exists from sugar_network import client +from sugar_network.db import files from sugar_network.toolkit import pylru, enforce @@ -30,8 +31,7 @@ _logger = logging.getLogger('cache') class Cache(object): - def __init__(self, volume): - self._volume = volume + def __init__(self): self._pool = None self._du = 0 self._acquired = {} @@ -71,14 +71,18 @@ class Cache(object): self.checkin(guid, acquired[1]) del self._acquired[guid] - def checkin(self, guid, size): + def checkin(self, digest, size): self._ensure_open() - if guid in self._pool: - self._pool.__getitem__(guid) + if digest in self._pool: + self._pool.__getitem__(digest) return + + + _logger.debug('Checkin %r %d bytes long', guid, size) - mtime = os.stat(self._volume['release'].path(guid)).st_mtime - self._pool[guid] = (size, mtime) + + mtime = os.stat(files.get(digest).path).st_mtime + self._pool[digest] = (size, mtime) self._du += size def checkout(self, guid, *args): @@ -112,17 +116,25 @@ class Cache(object): _logger.debug('Open releases pool') pool = [] - impls = self._volume['release'] - for res in impls.find(not_layer=['local'])[0]: - meta = res.meta('data') - if not meta or 'blob_size' not in meta: - continue - clone = self._volume['context'].path(res['context'], '.clone') - if exists(clone) and basename(os.readlink(clone)) == res.guid: + for release in self._volume['release'].find(not_layer=['local'])[0]: + meta = files.get(release['data']) + if not meta: continue + + """ + TODO + + solution_path = client.path('solutions', release['context']) + if exists(solution_path): + with file(path) as f: + cached_api_url, cached_stability, solution = json.load(f) + if solution[0]['guid'] == release['guid']: + continue + + """ pool.append(( - os.stat(impls.path(res.guid)).st_mtime, - res.guid, + os.stat(meta.path).st_mtime, + release.guid, meta.get('unpack_size') or meta['blob_size'], )) diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py index ee2a2f3..0dcae12 100644 --- a/sugar_network/client/journal.py +++ b/sugar_network/client/journal.py @@ -19,8 +19,8 @@ import logging from shutil import copyfileobj from tempfile import NamedTemporaryFile -from sugar_network import client -from sugar_network.toolkit.router import Blob, route, Request +from sugar_network import client, toolkit +from sugar_network.toolkit.router import route, Request from sugar_network.toolkit import enforce @@ -105,15 +105,13 @@ class Routes(object): @route('GET', ['journal', None, 'preview']) def journal_get_preview(self, request, response): - return Blob({ - 'blob': _prop_path(request.guid, 'preview'), + return toolkit.File(_prop_path(request.guid, 'preview'), { 'mime_type': 'image/png', }) @route('GET', ['journal', None, 'data']) def journal_get_data(self, request, response): - return Blob({ - 'blob': _ds_path(request.guid, 'data'), + return toolkit.File(_ds_path(request.guid, 'data'), { 'mime_type': get(request.guid, 'mime_type') or 'application/octet', }) diff --git a/sugar_network/client/releases.py b/sugar_network/client/releases.py index ff35d16..c93a91a 100644 --- a/sugar_network/client/releases.py +++ b/sugar_network/client/releases.py @@ -32,7 +32,8 @@ from sugar_network.client.cache import Cache from sugar_network.client import journal, packagekit from sugar_network.toolkit.router import Request, Response, route from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit import http, coroutine, enforce +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import i18n, http, coroutine, enforce _MIMETYPE_DEFAULTS_KEY = '/desktop/sugar/journal/defaults' @@ -43,22 +44,20 @@ _logger = logging.getLogger('releases') class Routes(object): - def __init__(self, local_volume): - self._volume = local_volume + def __init__(self): self._node_mtime = None self._call = lambda **kwargs: \ self._map_exceptions(self.fallback, **kwargs) - self._cache = Cache(local_volume) + self._cache = Cache() def invalidate_solutions(self, mtime): self._node_mtime = mtime @route('GET', ['context', None], cmd='path') def path(self, request): - clone_path = self._volume['context'].path(request.guid, '.clone') - enforce(exists(clone_path), http.NotFound) - clone_impl = basename(os.readlink(clone_path)) - return self._volume['release'].path(clone_impl, 'data') + clone = self._solve(request) + enforce(clone is not None, http.NotFound, 'No clones') + return clone['path'] @route('GET', ['context', None], cmd='launch', arguments={'args': list}, mime_type='text/event-stream') @@ -75,18 +74,18 @@ class Routes(object): acquired = [] try: - impl = self._solve_impl(context, request) + impl = self._solve(request, context['type']) if 'activity' not in context['type']: app = request.get('context') or \ _mimetype_context(impl['data']['mime_type']) enforce(app, 'Cannot find proper application') - acquired += self._checkin_impl( + acquired += self._checkin( context, request, self._cache.acquire) request = Request(path=['context', app], object_id=impl['path'], session=request.session) for context in self._checkin_context(request): - impl = self._solve_impl(context, request) - acquired += self._checkin_impl( + impl = self._solve(request, context['type']) + acquired += self._checkin( context, request, self._cache.acquire) child = _exec(context, request, impl) @@ -105,19 +104,15 @@ class Routes(object): enforce(not request.content or self.inline(), http.ServiceUnavailable, 'Not available in offline') for context in self._checkin_context(request, 'clone'): - cloned_path = context.path('.clone') if request.content: - impl = self._solve_impl(context, request) - self._checkin_impl(context, request, self._cache.checkout) - impl_path = relpath(dirname(impl['path']), context.path()) - os.symlink(impl_path, cloned_path) + impl = self._solve(request, context['type']) + self._checkin(context, request, self._cache.checkout) yield {'event': 'ready'} else: - cloned_impl = basename(os.readlink(cloned_path)) - meta = self._volume['release'].get(cloned_impl).meta('data') + clone = self._solve(request) + meta = this.volume['release'].get(clone['guid']).meta('data') size = meta.get('unpack_size') or meta['blob_size'] - self._cache.checkin(cloned_impl, size) - os.unlink(cloned_path) + self._cache.checkin(clone['guid'], size) @route('GET', ['context', None], cmd='clone', arguments={'requires': list}) @@ -147,18 +142,14 @@ class Routes(object): raise http.ServiceUnavailable, error, sys.exc_info()[2] def _checkin_context(self, request, layer=None): - contexts = self._volume['context'] + contexts = this.volume['context'] guid = request.guid if layer and not request.content and not contexts.exists(guid): return if not contexts.exists(guid): - context = self._call(method='GET', path=['context', guid]) - contexts.create(context, setters=True) - for prop in ('icon', 'artifact_icon', 'logo'): - blob = self._call(method='GET', path=['context', guid, prop]) - if blob is not None: - contexts.update(guid, {prop: {'blob': blob}}) + patch = self._call(method='GET', path=['context', guid], cmd='diff') + contexts.merge(guid, patch) context = contexts.get(guid) if layer and bool(request.content) == (layer in context['layer']): return @@ -171,14 +162,9 @@ class Routes(object): else: layer_value = set(context['layer']) - set([layer]) contexts.update(guid, {'layer': list(layer_value)}) - self.broadcast({ - 'event': 'update', - 'resource': 'context', - 'guid': guid, - }) _logger.debug('Checked %r in: %r', guid, layer_value) - def _solve_impl(self, context, request): + def _solve(self, request, force_type=None): stability = request.get('stability') or \ client.stability(request.guid) @@ -193,9 +179,11 @@ class Routes(object): solution, stale = self._cache_solution_get(request.guid, stability) if stale is False: _logger.debug('Reuse cached %r solution', request.guid) - elif solution is not None and not self.inline(): - _logger.debug('Reuse stale %r in offline', request.guid) - elif 'activity' in context['type']: + elif solution is not None and (not force_type or not self.inline()): + _logger.debug('Reuse stale %r solution', request.guid) + elif not force_type: + return None + elif 'activity' in force_type: from sugar_network.client import solver solution = self._map_exceptions(solver.solve, self.fallback, request.guid, stability) @@ -203,16 +191,18 @@ class Routes(object): response = Response() blob = self._call(method='GET', path=['context', request.guid], cmd='clone', stability=stability, response=response) - response.meta['data']['blob'] = blob - solution = [response.meta] + release = response.meta + release['mime_type'] = response.content_type + release['size'] = response.content_length + files.post(blob, digest=release['spec']['*-*']['bundle']) + solution = [release] request.session['solution'] = solution return solution[0] - def _checkin_impl(self, context, request, cache_call): + def _checkin(self, context, request, cache_call): if 'clone' in context['layer']: cache_call = self._cache.checkout - impls = self._volume['release'] if 'activity' in context['type']: to_install = [] @@ -226,49 +216,42 @@ class Routes(object): def cache_impl(sel): guid = sel['guid'] - data = sel['data'] - sel['path'] = impls.path(guid, 'data') - size = data.get('unpack_size') or data['blob_size'] - - blob = None - if 'blob' in data: - blob = data.pop('blob') - - if impls.exists(guid): - return cache_call(guid, size) - - if blob is None: - blob = self._call(method='GET', path=['release', guid, 'data']) - - blob_dir = dirname(sel['path']) - if not exists(blob_dir): - os.makedirs(blob_dir) - - with toolkit.mkdtemp(dir=blob_dir) as blob_dir: - if 'activity' in context['type']: - self._cache.ensure(size, data['blob_size']) - with toolkit.TemporaryFile() as tmp_file: - shutil.copyfileobj(blob, tmp_file) - tmp_file.seek(0) - with Bundle(tmp_file, 'application/zip') as bundle: - bundle.extractall(blob_dir, prefix=bundle.rootdir) - for exec_dir in ('bin', 'activity'): - bin_path = join(blob_dir, exec_dir) - if not exists(bin_path): - continue - for filename in os.listdir(bin_path): - os.chmod(join(bin_path, filename), 0755) - blob = blob_dir - else: - self._cache.ensure(size) - with file(join(blob_dir, 'data'), 'wb') as f: - shutil.copyfileobj(blob, f) - blob = f.name - impl = deepcopy(sel) - impl['mtime'] = impl['ctime'] - impl['data']['blob'] = blob - impls.create(impl) - return cache_call(guid, size) + + + + + data = files.get(guid) + + if data is not None: + return cache_call(guid, data['unpack_size']) + + response = Response() + blob = self._call(method='GET', path=['release', guid, 'data'], + response=response) + + if 'activity' not in context['type']: + self._cache.ensure(response.content_length) + files.post(blob, response.meta, sel['data']) + return cache_call(guid, response.content_length) + + with toolkit.mkdtemp(dir=files.path(sel['data'])) as blob_dir: + self._cache.ensure( + response.meta['unpack_size'], + response.content_length) + with toolkit.TemporaryFile() as tmp_file: + shutil.copyfileobj(blob, tmp_file) + tmp_file.seek(0) + with Bundle(tmp_file, 'application/zip') as bundle: + bundle.extractall(blob_dir, prefix=bundle.rootdir) + for exec_dir in ('bin', 'activity'): + bin_path = join(blob_dir, exec_dir) + if not exists(bin_path): + continue + for filename in os.listdir(bin_path): + os.chmod(join(bin_path, filename), 0755) + + files.update(sel['data'], response.meta) + return cache_call(guid, response.meta['unpack_size']) result = [] for sel in request.session['solution']: @@ -278,11 +261,8 @@ class Routes(object): request.session['stability'], request.session['solution']) return result - def _cache_solution_path(self, guid): - return client.path('solutions', guid[:2], guid) - def _cache_solution_get(self, guid, stability): - path = self._cache_solution_path(guid) + path = client.path('solutions', guid) solution = None if exists(path): try: @@ -305,7 +285,7 @@ class Routes(object): def _cache_solution_set(self, guid, stability, solution): if isinstance(solution, _CachedSolution): return - path = self._cache_solution_path(guid) + path = client.path('solutions', guid) if not exists(dirname(path)): os.makedirs(dirname(path)) with file(path, 'w') as f: @@ -315,13 +295,12 @@ class Routes(object): for context in self._checkin_context(request): if 'clone' not in context['layer']: return self._map_exceptions(self.fallback, request, response) - guid = basename(os.readlink(context.path('.clone'))) - impl = self._volume['release'].get(guid) - response.meta = impl.properties([ + release = this.volume['release'].get(self._solve(request)['guid']) + response.meta = release.properties([ 'guid', 'ctime', 'layer', 'author', 'tags', 'context', 'version', 'stability', 'license', 'notes', 'data', ]) - return impl.meta('data') + return release.meta('data') def _activity_id_new(): @@ -397,7 +376,7 @@ def _exec(context, request, sel): environ['SUGAR_BUNDLE_PATH'] = impl_path environ['SUGAR_BUNDLE_ID'] = context.guid environ['SUGAR_BUNDLE_NAME'] = \ - toolkit.gettext(context['title']).encode('utf8') + i18n.decode(context['title']).encode('utf8') environ['SUGAR_BUNDLE_VERSION'] = sel['version'] environ['SUGAR_ACTIVITY_ROOT'] = datadir environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale') diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index c6ea6d2..50d8632 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -24,6 +24,7 @@ from sugar_network import db, client, node, toolkit, model from sugar_network.client import journal, releases from sugar_network.node.slave import SlaveRoutes from sugar_network.toolkit import netlink, mountpoints +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.router import ACL, Request, Response, Router from sugar_network.toolkit.router import route, fallbackroute from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce @@ -189,44 +190,38 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): yield {'event': 'done', 'guid': guid} @fallbackroute() - def fallback(self, request=None, response=None, method=None, path=None, - cmd=None, content=None, content_stream=None, content_type=None, - **kwargs): + def fallback(self, request=None, response=None, **kwargs): if request is None: - request = Request(method=method, path=path, cmd=cmd, - content=content, content_stream=content_stream, - content_type=content_type) + request = Request(**kwargs) if response is None: response = Response() - request.update(kwargs) - if self._inline.is_set(): - if client.layers.value and \ - request.resource in ('context', 'release'): - request.add('layer', *client.layers.value) - request.principal = self._auth.login - try: - reply = self._node.call(request, response) - if hasattr(reply, 'read'): - if response.relocations: - return reply - else: - return _ResponseStream(reply, self._restart_online) - else: - return reply - except (http.ConnectionError, IncompleteRead): + + if not self._inline.is_set(): + return self._local.call(request, response) + + if client.layers.value and request.resource in ('context', 'release'): + request.add('layer', *client.layers.value) + request.principal = self._auth.login + try: + reply = self._node.call(request, response) + if hasattr(reply, 'read'): if response.relocations: - raise - self._restart_online() - return self._local.call(request, response) - else: + return reply + else: + return _ResponseStream(reply, self._restart_online) + else: + return reply + except (http.ConnectionError, IncompleteRead): + if response.relocations: + raise + self._restart_online() return self._local.call(request, response) def _got_online(self): enforce(not self._inline.is_set()) _logger.debug('Got online on %r', self._node) self._inline.set() - self.broadcast({'event': 'inline', 'state': 'online'}) - self._local.volume.broadcast = None + this.localcast({'event': 'inline', 'state': 'online'}) def _got_offline(self, force=False): if not force and not self._inline.is_set(): @@ -235,9 +230,8 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): self._node.close() if self._inline.is_set(): _logger.debug('Got offline on %r', self._node) - self.broadcast({'event': 'inline', 'state': 'offline'}) + this.localcast({'event': 'inline', 'state': 'offline'}) self._inline.clear() - self._local.volume.broadcast = self.broadcast def _restart_online(self): _logger.debug('Lost %r connection, try to reconnect in %s seconds', @@ -266,16 +260,19 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): mtime = event.get('mtime') if mtime: self.invalidate_solutions(mtime) - self.broadcast(event) + this.broadcast(event) def handshake(url): _logger.debug('Connecting to %r node', url) self._node = client.Connection(url, auth=self._auth) status = self._node.get(cmd='status') self._auth.allow_basic_auth = (status.get('level') == 'master') + """ + TODO switch to seqno impl_info = status['resources'].get('release') if impl_info: self.invalidate_solutions(impl_info['mtime']) + """ if self._inline.is_set(): _logger.info('Reconnected to %r node', url) else: @@ -284,7 +281,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): def connect(): timeout = _RECONNECT_TIMEOUT while True: - self.broadcast({'event': 'inline', 'state': 'connecting'}) + this.localcast({'event': 'inline', 'state': 'connecting'}) for url in self._remote_urls: while True: try: @@ -329,8 +326,7 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): profile['guid'] = self._auth.login volume['user'].create(profile) - self._node = _NodeRoutes(join(db_path, 'node'), volume, - self.broadcast) + self._node = _NodeRoutes(join(db_path, 'node'), volume) self._jobs.spawn(volume.populate) logging.info('Start %r node on %s port', volume.root, node.port.value) @@ -364,6 +360,11 @@ class CachedClientRoutes(ClientRoutes): ClientRoutes._got_offline(self, force) def _push(self): + # TODO should work using regular pull/push + return + + + pushed_seq = toolkit.Sequence() skiped_seq = toolkit.Sequence() volume = self._local.volume @@ -388,24 +389,11 @@ class CachedClientRoutes(ClientRoutes): diff_seq = toolkit.Sequence() post_requests = [] for prop, meta, seqno in patch: - if 'blob' in meta: - request = Request(method='PUT', path=[res, guid, prop]) - request.content_type = meta['mime_type'] - request.content_length = os.stat(meta['blob']).st_size - request.content_stream = \ - toolkit.iter_file(meta['blob']) - post_requests.append((request, seqno)) - elif 'url' in meta: - request = Request(method='PUT', path=[res, guid, prop]) - request.content_type = 'application/json' - request.content = meta - post_requests.append((request, seqno)) - else: - value = meta['value'] - if prop == 'layer': - value = list(set(value) - _LOCAL_LAYERS) - diff[prop] = value - diff_seq.include(seqno, seqno) + value = meta['value'] + if prop == 'layer': + value = list(set(value) - _LOCAL_LAYERS) + diff[prop] = value + diff_seq.include(seqno, seqno) if not diff: continue if 'guid' in diff: @@ -426,7 +414,6 @@ class CachedClientRoutes(ClientRoutes): if not pushed_seq: if not self._push_seq.mtime: self._push_seq.commit() - self.broadcast({'event': 'push'}) return _logger.info('Pushed %r local cache', pushed_seq) @@ -441,38 +428,32 @@ class CachedClientRoutes(ClientRoutes): volume['report'].wipe() self._push_seq.commit() - self.broadcast({'event': 'push'}) -class _LocalRoutes(model.VolumeRoutes, Router): +class _LocalRoutes(db.Routes, Router): def __init__(self, volume): - model.VolumeRoutes.__init__(self, volume) + db.Routes.__init__(self, volume) Router.__init__(self, self) - def on_create(self, request, props, event): + def on_create(self, request, props): props['layer'] = tuple(props['layer']) + ('local',) - model.VolumeRoutes.on_create(self, request, props, event) + db.Routes.on_create(self, request, props) class _NodeRoutes(SlaveRoutes, Router): - def __init__(self, key_path, volume, localcast): + def __init__(self, key_path, volume): SlaveRoutes.__init__(self, key_path, volume) Router.__init__(self, self) self.api_url = 'http://127.0.0.1:%s' % node.port.value - self._localcast = localcast self._mounts = toolkit.Pool() self._jobs = coroutine.Pool() mountpoints.connect(_SYNC_DIRNAME, self.__found_mountcb, self.__lost_mount_cb) - def broadcast(self, event=None, request=None): - SlaveRoutes.broadcast(self, event, request) - self._localcast(event) - def close(self): self.volume.close() @@ -481,27 +462,27 @@ class _NodeRoutes(SlaveRoutes, Router): (self.volume.root, self.api_url) def _sync_mounts(self): - self._localcast({'event': 'sync_start'}) + this.localcast({'event': 'sync_start'}) for mountpoint in self._mounts: - self._localcast({'event': 'sync_next', 'path': mountpoint}) + this.localcast({'event': 'sync_next', 'path': mountpoint}) try: self._offline_session = self._offline_sync( join(mountpoint, _SYNC_DIRNAME), **(self._offline_session or {})) except Exception, error: _logger.exception('Failed to complete synchronization') - self._localcast({'event': 'sync_abort', 'error': str(error)}) + this.localcast({'event': 'sync_abort', 'error': str(error)}) self._offline_session = None raise if self._offline_session is None: _logger.debug('Synchronization completed') - self._localcast({'event': 'sync_complete'}) + this.localcast({'event': 'sync_complete'}) else: _logger.debug('Postpone synchronization with %r session', self._offline_session) - self._localcast({'event': 'sync_paused'}) + this.localcast({'event': 'sync_paused'}) def __found_mountcb(self, path): self._mounts.add(path) diff --git a/sugar_network/client/solver.py b/sugar_network/client/solver.py index 67350b6..84eb9cf 100644 --- a/sugar_network/client/solver.py +++ b/sugar_network/client/solver.py @@ -20,6 +20,7 @@ import logging from os.path import isabs, join, dirname from sugar_network.client import packagekit +from sugar_network.toolkit.router import ACL from sugar_network.toolkit.spec import parse_version from sugar_network.toolkit import http, lsb_release @@ -191,12 +192,10 @@ def _load_feed(context): feed.name = context return feed - feed_content = None + releases = None try: - feed_content = _call(method='GET', path=['context', context], - cmd='feed', layer='origin', stability=_stability, - distro=lsb_release.distributor_id()) - _logger.trace('[%s] Found feed: %r', context, feed_content) + releases = _call(method='GET', path=['context', context, 'releases']) + _logger.trace('[%s] Found feed: %r', context, releases) except http.ServiceUnavailable: _logger.trace('[%s] Failed to fetch the feed', context) raise @@ -204,13 +203,33 @@ def _load_feed(context): _logger.exception('[%s] Failed to fetch the feed', context) return None + """ + for digest, release in releases: + if [i for i in release['author'].values() + if i['role'] & ACL.ORIGINAL] and \ + release['stability'] == _stability and \ + f + + + + + + stability=_stability, + distro=lsb_release.distributor_id()) + """ + + for impl in feed_content['releases']: + feed.implement(impl) + + + # XXX 0install fails on non-ascii `name` values feed.name = context feed.to_resolve = feed_content.get('packages') if not feed.to_resolve: _logger.trace('[%s] No compatible packages', context) - for impl in feed_content['releases']: - feed.implement(impl) + + if not feed.to_resolve and not feed.implementations: _logger.trace('[%s] No releases', context) |