diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-16 10:54:04 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-16 10:54:04 (GMT) |
commit | 83d0187c5742ae539cd63554555a6f285925a149 (patch) | |
tree | 16996dbad88f206ae75bbc0537b3586650264a33 /sugar_network | |
parent | ae31651d6ae31215db903530115bf340ae9f98f5 (diff) |
Switch client code to server side resolves
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/client/__init__.py | 32 | ||||
-rw-r--r-- | sugar_network/client/cache.py | 196 | ||||
-rw-r--r-- | sugar_network/client/injector.py | 463 | ||||
-rw-r--r-- | sugar_network/client/packagekit.py | 92 | ||||
-rw-r--r-- | sugar_network/client/releases.py | 392 | ||||
-rw-r--r-- | sugar_network/client/solver.py | 407 | ||||
-rw-r--r-- | sugar_network/model/__init__.py | 3 | ||||
-rw-r--r-- | sugar_network/node/model.py | 14 | ||||
-rw-r--r-- | sugar_network/node/routes.py | 16 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 31 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 21 | ||||
-rw-r--r-- | sugar_network/toolkit/parcel.py | 2 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 11 |
13 files changed, 593 insertions, 1087 deletions
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py index c863e98..446795a 100644 --- a/sugar_network/client/__init__.py +++ b/sugar_network/client/__init__.py @@ -40,16 +40,16 @@ def profile_path(*args): return join(root_dir, *args) -api_url = Option( - 'url to connect to Sugar Network server API', +api = Option( + 'url to connect to Sugar Network node API', default='http://node-devel.sugarlabs.org', short_option='-a', - name='api-url') + name='api') certfile = Option( - 'path to SSL certificate file to connect to server via HTTPS') + 'path to SSL certificate file to connect to node via HTTPS') no_check_certificate = Option( - 'do not check the server certificate against the available ' + 'do not check the node certificate against the available ' 'certificate authorities', default=False, type_cast=Option.bool_cast, action='store_true') @@ -57,10 +57,10 @@ local_root = Option( 'path to the directory to keep all local data', default=profile_path('network'), name='local_root') -server_mode = Option( - 'start server to share local documents', +node_mode = Option( + 'start node to share local documents', default=False, type_cast=Option.bool_cast, - action='store_true', name='server-mode') + action='store_true', name='node-mode') delayed_start = Option( 'immediate start only database and the rest on getting ' @@ -86,10 +86,10 @@ layers = Option( default=[], type_cast=Option.list_cast, type_repr=Option.list_repr, name='layers') -discover_server = Option( - 'discover servers in local network instead of using --api-url', +discover_node = Option( + 'discover nodes in local network instead of using --api', default=False, type_cast=Option.bool_cast, - action='store_true', name='discover_server') + action='store_true', name='discover-node') cache_limit = Option( 'the minimal disk free space, in bytes, to preserve while recycling ' @@ -113,19 +113,19 @@ cache_timeout = Option( default=3600, type_cast=int, name='cache-timeout') login = Option( - 'Sugar Labs account to connect to Sugar Network API server; ' + 'Sugar Labs account to connect to Sugar Network API node; ' 'should be set only if either password is provided or public key ' 'for Sugar Labs account was uploaded to the Sugar Network', name='login', short_option='-l') password = Option( - 'Sugar Labs account password to connect to Sugar Network API server ' + 'Sugar Labs account password to connect to Sugar Network API node ' 'using Basic authentication; if omitted, keys based authentication ' 'will be used', name='password', short_option='-p') keyfile = Option( - 'path to RSA private key to connect to Sugar Network API server', + 'path to RSA private key to connect to Sugar Network API node', name='keyfile', short_option='-k', default='~/.ssh/sugar-network') @@ -173,13 +173,13 @@ def stability(context): def Connection(url=None, **args): if url is None: - url = api_url.value + url = api.value return http.Connection(url, verify=not no_check_certificate.value, **args) def IPCConnection(): return http.Connection( - api_url='http://127.0.0.1:%s' % ipc_port.value, + api='http://127.0.0.1:%s' % ipc_port.value, # Online ipc->client->node request might fail if node connection # is lost in client process, so, re-send ipc request immediately # to retrive data from client in offline mode without propagating diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py deleted file mode 100644 index 8bee316..0000000 --- a/sugar_network/client/cache.py +++ /dev/null @@ -1,196 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# sugar-lint: disable - -import os -import sys -import time -import logging -from os.path import exists - -from sugar_network import client -from sugar_network.db import blobs -from sugar_network.toolkit import pylru, enforce - - -_POOL_SIZE = 256 - -_logger = logging.getLogger('cache') - - -class Cache(object): - - def __init__(self): - self._pool = None - self._du = 0 - self._acquired = {} - - def __iter__(self): - self._ensure_open() - return iter(self._pool) - - @property - def du(self): - return self._du - - def ensure(self, requested_size, temp_size=0): - self._ensure_open() - to_free = self._to_free(requested_size, temp_size) - if to_free <= 0: - return - enforce(self._du >= to_free, 'No free disk space') - for guid, size, mtime in self._reversed_iter(): - self._checkout(guid, (size, mtime)) - to_free -= size - if to_free <= 0: - break - - def acquire(self, guid, size): - self.checkout(guid) - self._acquired.setdefault(guid, [0, size])[0] += 1 - return guid - - def release(self, *guids): - for guid in guids: - acquired = self._acquired.get(guid) - if acquired is None: - continue - acquired[0] -= 1 - if acquired[0] <= 0: - self.checkin(guid, acquired[1]) - del self._acquired[guid] - - def checkin(self, digest, size): - self._ensure_open() - if digest in self._pool: - self._pool.__getitem__(digest) - return - - - - _logger.debug('Checkin %r %d bytes long', guid, size) - - mtime = os.stat(files.get(digest).path).st_mtime - self._pool[digest] = (size, mtime) - self._du += size - - def checkout(self, guid, *args): - self._ensure_open() - if guid not in self._pool: - return False - _logger.debug('Checkout %r', guid) - size, __ = self._pool.peek(guid) - self._du -= size - del self._pool[guid] - return True - - def recycle(self): - self._ensure_open() - ts = time.time() - to_free = self._to_free(0, 0) - for guid, size, mtime in self._reversed_iter(): - if to_free > 0: - self._checkout(guid, (size, mtime)) - to_free -= size - elif client.cache_lifetime.value and \ - client.cache_lifetime.value < (ts - mtime) / 86400.0: - self._checkout(guid, (size, None)) - else: - break - - def _ensure_open(self): - if self._pool is not None: - return - - _logger.debug('Open releases pool') - - pool = [] - for release in self._volume['release'].find(not_layer=['local'])[0]: - meta = files.get(release['data']) - if not meta: - continue - - """ - TODO - - solution_path = client.path('solutions', release['context']) - if exists(solution_path): - with file(path) as f: - cached_api_url, cached_stability, solution = json.load(f) - if solution[0]['guid'] == release['guid']: - continue - - """ - pool.append(( - os.stat(meta.path).st_mtime, - release.guid, - meta.get('unpack_size') or meta['blob_size'], - )) - - self._pool = pylru.lrucache(_POOL_SIZE, self._checkout) - for mtime, guid, size in sorted(pool): - self._pool[guid] = (size, mtime) - self._du += size - - def _to_free(self, requested_size, temp_size): - if not client.cache_limit.value and \ - not client.cache_limit_percent.value: - return 0 - - stat = os.statvfs(client.local_root.value) - if stat.f_blocks == 0: - # TODO Sounds like a tmpfs or so - return 0 - - limit = sys.maxint - free = stat.f_bfree * stat.f_frsize - if client.cache_limit_percent.value: - total = stat.f_blocks * stat.f_frsize - limit = client.cache_limit_percent.value * total / 100 - if client.cache_limit.value: - limit = min(limit, client.cache_limit.value) - to_free = max(limit, temp_size) - (free - requested_size) - - if to_free > 0: - _logger.debug( - 'Need to recycle %d bytes, ' - 'free_size=%d requested_size=%d temp_size=%d', - to_free, free, requested_size, temp_size) - return to_free - - def _reversed_iter(self): - i = self._pool.head.prev - while True: - while i.empty: - if i is self._pool.head: - return - i = i.prev - size, mtime = i.value - yield i.key, size, mtime - if i is self._pool.head: - break - i = i.next - - def _checkout(self, guid, value): - size, mtime = value - if mtime is None: - _logger.debug('Recycle stale %r to save %s bytes', guid, size) - else: - _logger.debug('Recycle %r to save %s bytes', guid, size) - self._volume['release'].delete(guid) - self._du -= size - if guid in self._pool: - del self._pool[guid] diff --git a/sugar_network/client/injector.py b/sugar_network/client/injector.py new file mode 100644 index 0000000..12baf51 --- /dev/null +++ b/sugar_network/client/injector.py @@ -0,0 +1,463 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import re +import sys +import json +import time +import random +import hashlib +import logging +from os.path import exists, join + +from sugar_network import toolkit +from sugar_network.client import packagekit, journal, profile_path +from sugar_network.toolkit.spec import format_version +from sugar_network.toolkit.bundle import Bundle +from sugar_network.toolkit import lsb_release, coroutine, i18n, pylru, http +from sugar_network.toolkit import enforce + + +_PREEMPTIVE_POOL_SIZE = 256 + +_logger = logging.getLogger('client.injector') + + +class Injector(object): + + seqno = 0 + + def __init__(self, root, lifetime=None, limit_bytes=None, + limit_percent=None): + self._root = root + self._pool = _PreemptivePool(join(root, 'releases'), lifetime, + limit_bytes, limit_percent) + self._api = None + self._checkins = toolkit.Bin(join(root, 'checkins'), {}) + + for dir_name in ('solutions', 'releases'): + dir_path = join(root, dir_name) + if not exists(dir_path): + os.makedirs(dir_path) + + @property + def api(self): + if self._api is not None: + return self._api.url + + @api.setter + def api(self, value): + if not value: + self._api = None + else: + self._api = http.Connection(value) + + def close(self): + self._pool.close() + + def recycle(self): + self._pool.recycle() + + def launch(self, context, stability='stable', app=None, activity_id=None, + object_id=None, uri=None, args=None): + if object_id and not activity_id: + activity_id = journal.get(object_id, 'activity_id') + if not activity_id: + activity_id = _activity_id_new() + yield {'activity_id': activity_id} + + yield {'event': 'launch', 'state': 'init'} + releases = [] + acquired = [] + checkedin = {} + environ = {} + + def acquire(ctx): + solution = self._solve(ctx, stability) + environ.update({'context': ctx, 'solution': solution}) + self._pool.pop(solution.values()) + if ctx in self._checkins: + checkedin[ctx] = (self.api, stability, self.seqno) + else: + _logger.debug('Acquire %r', ctx) + acquired.extend(solution.values()) + releases.extend(solution.values()) + release = solution[ctx] + return release, self._pool.path(release['blob']) + + try: + yield {'event': 'launch', 'state': 'solve'} + release, path = acquire(context) + if app is None and \ + release['content-type'] != 'application/vnd.olpc-sugar': + app = _app_by_mimetype(release['content-type']) + enforce(app, 'Cannot find proper application') + if app is None: + _logger.debug('Execute %r', context) + else: + uri = path + environ['document'] = release['blob'] + release, path = acquire(app) + _logger.debug('Open %r in %r', context, app) + context = app + + for event in self._download(releases): + event['event'] = 'launch' + yield event + for event in self._install(releases): + event['event'] = 'launch' + yield event + + if args is None: + args = [] + args.extend(['-b', context]) + args.extend(['-a', activity_id]) + if object_id: + args.extend(['-o', object_id]) + if uri: + args.extend(['-u', uri]) + child = _exec(context, release, path, args, environ) + yield {'event': 'launch', 'state': 'exec'} + + yield environ + status = child.wait() + finally: + if acquired: + _logger.debug('Release acquired contexts') + self._pool.push(acquired) + + if checkedin: + with self._checkins as checkins: + checkins.update(checkedin) + + _logger.debug('Exit %s[%s]: %r', context, child.pid, status) + enforce(status == 0, 'Process exited with %r status', status) + yield {'event': 'launch', 'state': 'exit'} + + def checkin(self, context, stability='stable'): + if context in self._checkins: + _logger.debug('Refresh %r checkin', context) + else: + _logger.debug('Checkin %r', context) + yield {'event': 'checkin', 'state': 'solve'} + solution = self._solve(context, stability) + for event in self._download(solution.values()): + event['event'] = 'checkin' + yield event + self._pool.pop(solution.values()) + with self._checkins as checkins: + checkins[context] = (self.api, stability, self.seqno) + yield {'event': 'checkin', 'state': 'ready'} + + def checkout(self, context): + if context not in self._checkins: + return False + _logger.debug('Checkout %r', context) + with file(join(self._root, 'solutions', context)) as f: + __, __, __, solution = json.load(f) + self._pool.push(solution.values()) + with self._checkins as checkins: + del checkins[context] + return True + + def _solve(self, context, stability): + path = join(self._root, 'solutions', context) + solution = None + + if exists(path): + with file(path) as f: + api, stability_, seqno, solution = json.load(f) + if self.api: + if api != self.api or \ + stability_ and set(stability_) != set(stability) or \ + seqno < self.seqno or \ + int(os.stat(path).st_mtime) < packagekit.mtime(): + _logger.debug('Reset stale %r solution', context) + solution = None + else: + _logger.debug('Reuse cached %r solution', context) + else: + _logger.debug('Reuse cached %r solution in offline', context) + + if not solution: + enforce(self.api, 'Cannot solve in offline') + _logger.debug('Solve %r', context) + solution = self._api.get(['context', context], cmd='solve', + stability=stability, lsb_id=lsb_release.distributor_id(), + lsb_release=lsb_release.release()) + with toolkit.new_file(path) as f: + json.dump((self.api, stability, self.seqno, solution), f) + + return solution + + def _download(self, solution): + to_download = [] + download_size = 0 + size = 0 + + for release in solution: + digest = release.get('blob') + if not digest or exists(self._pool.path(digest)): + continue + enforce(self._api is not None, 'Cannot download in offline') + download_size = max(download_size, release['size']) + size += release.get('unpack_size') or release['size'] + to_download.append((digest, release)) + + if not to_download: + return + + self._pool.ensure(size, download_size) + for digest, release in to_download: + yield {'state': 'download'} + with toolkit.NamedTemporaryFile() as tmp_file: + self._api.download(['blobs', digest], tmp_file.name) + path = self._pool.path(digest) + if 'unpack_size' in release: + with Bundle(tmp_file, 'application/zip') as bundle: + bundle.extractall(path, prefix=bundle.rootdir) + for exec_dir in ('bin', 'activity'): + bin_path = join(path, exec_dir) + if not exists(bin_path): + continue + for filename in os.listdir(bin_path): + os.chmod(join(bin_path, filename), 0755) + else: + os.rename(tmp_file.name, path) + + def _install(self, solution): + to_install = [] + + for release in solution: + packages = release.get('packages') + if packages: + to_install.extend(packages) + + if to_install: + yield {'state': 'install'} + packagekit.install(to_install) + + +class _PreemptivePool(object): + + def __init__(self, root, lifetime, limit_bytes, limit_percent): + self._root = root + self._lifetime = lifetime + self._limit_bytes = limit_bytes + self._limit_percent = limit_percent + self._lru = None + self._du = None + + def __iter__(self): + """Least recently to most recently used iterator.""" + if self._lru is None: + self._init() + i = self._lru.head.prev + while True: + while i.empty: + if i is self._lru.head: + return + i = i.prev + yield i.key, i.value + if i is self._lru.head: + break + i = i.prev + + def close(self): + if self._lru is not None: + with toolkit.new_file(self._root + '.index') as f: + json.dump((self._du, [i for i in self]), f) + self._lru = None + + def path(self, digest): + return join(self._root, digest) + + def push(self, solution): + if self._lru is None: + self._init() + for release in solution: + digest = release.get('blob') + if not digest: + continue + path = join(self._root, digest) + if not exists(path): + continue + size = release.get('unpack_size') or release['size'] + self._lru[digest] = (size, os.stat(path).st_mtime) + self._du += size + _logger.debug('Push %r release %s bytes', digest, size) + + def pop(self, solution): + if self._lru is None: + self._init() + found = False + for release in solution: + digest = release.get('blob') + if digest and digest in self._lru: + self._pop(digest, False) + found = True + return found + + def ensure(self, requested_size, temp_size=0): + if self._lru is None: + self._init() + to_free = self._to_free(requested_size, temp_size) + if to_free <= 0: + return + enforce(self._du >= to_free, 'No free disk space') + for digest, (size, __) in self: + self._pop(digest) + to_free -= size + if to_free <= 0: + break + + def recycle(self): + if self._lru is None: + self._init() + ts = time.time() + to_free = self._to_free(0, 0) + for digest, (size, mtime) in self: + if to_free > 0: + self._pop(digest) + to_free -= size + elif self._lifetime and self._lifetime < (ts - mtime) / 86400.0: + self._pop(digest) + else: + break + + def _init(self): + self._lru = pylru.lrucache(_PREEMPTIVE_POOL_SIZE, self._pop) + if not exists(self._root + '.index'): + self._du = 0 + else: + with file(self._root + '.index') as f: + self._du, items = json.load(f) + for key, value in items: + self._lru[key] = value + + def _pop(self, digest, unlink=True): + size, __ = self._lru.peek(digest) + _logger.debug('Pop %r release and save %s bytes', digest, size) + self._du -= size + del self._lru[digest] + path = join(self._root, digest) + if unlink and exists(path): + os.unlink(path) + + def _to_free(self, requested_size, temp_size): + if not self._limit_bytes and not self._limit_percent: + return 0 + + stat = os.statvfs(self._root) + if stat.f_blocks == 0: + # TODO Sounds like a tmpfs or so + return 0 + + limit = sys.maxint + free = stat.f_bfree * stat.f_frsize + if self._limit_percent: + total = stat.f_blocks * stat.f_frsize + limit = self._limit_percent * total / 100 + if self._limit_bytes: + limit = min(limit, self._limit_bytes) + to_free = max(limit, temp_size) - (free - requested_size) + + if to_free > 0: + _logger.debug( + 'Need to recycle %d bytes, ' + 'free_size=%d requested_size=%d temp_size=%d', + to_free, free, requested_size, temp_size) + return to_free + + +def _exec(context, release, path, args, environ): + # pylint: disable-msg=W0212 + datadir = profile_path('data', context) + logdir = profile_path('logs') + + for i in [ + join(datadir, 'instance'), + join(datadir, 'data'), + join(datadir, 'tmp'), + logdir, + ]: + if not exists(i): + os.makedirs(i) + + log_path = toolkit.unique_filename(logdir, context + '.log') + environ['logs'] = [ + profile_path('logs', 'shell.log'), + profile_path('logs', 'sugar-network-client.log'), + log_path, + ] + + __, command = release['command'] + args = command.split() + args + environ['args'] = args + + child = coroutine.fork() + if child is not None: + _logger.debug('Exec %s[%s]: %r', context, child.pid, args) + return child + try: + with file('/dev/null', 'r') as f: + os.dup2(f.fileno(), 0) + with file(log_path, 'a+') as f: + os.dup2(f.fileno(), 1) + os.dup2(f.fileno(), 2) + toolkit.init_logging() + + os.chdir(path) + + environ = os.environ + environ['PATH'] = ':'.join([ + join(path, 'activity'), + join(path, 'bin'), + environ['PATH'], + ]) + environ['PYTHONPATH'] = path + ':' + environ.get('PYTHONPATH', '') + environ['SUGAR_BUNDLE_PATH'] = path + environ['SUGAR_BUNDLE_ID'] = context + environ['SUGAR_BUNDLE_NAME'] = i18n.decode(release['title']) + environ['SUGAR_BUNDLE_VERSION'] = format_version(release['version']) + environ['SUGAR_ACTIVITY_ROOT'] = datadir + environ['SUGAR_LOCALEDIR'] = join(path, 'locale') + + os.execvpe(args[0], args, environ) + except BaseException: + logging.exception('Failed to execute %r args=%r', release, args) + finally: + os._exit(1) + + +def _activity_id_new(): + from uuid import getnode + data = '%s%s%s' % ( + time.time(), + random.randint(10000, 100000), + getnode()) + return hashlib.sha1(data).hexdigest() + + +def _app_by_mimetype(mime_type): + import gconf + mime_type = _MIMETYPE_INVALID_CHARS.sub('_', mime_type) + key = '/'.join([_MIMETYPE_DEFAULTS_KEY, mime_type]) + return gconf.client_get_default().get_string(key) + + +_MIMETYPE_DEFAULTS_KEY = '/desktop/sugar/journal/defaults' +_MIMETYPE_INVALID_CHARS = re.compile('[^a-zA-Z0-9-_/.]') diff --git a/sugar_network/client/packagekit.py b/sugar_network/client/packagekit.py index 782f09e..68772e8 100644 --- a/sugar_network/client/packagekit.py +++ b/sugar_network/client/packagekit.py @@ -1,4 +1,4 @@ -# Copyright (C) 2010-2013 Aleksey Lim +# Copyright (C) 2010-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,6 +14,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os +import re import logging from sugar_network.toolkit import lsb_release, gbus, enforce @@ -37,7 +38,23 @@ def mtime(): return os.stat(_pms_path).st_mtime -def resolve(names): +def install(names): + ids = [i['pk_id'] for i in _resolve(names)] + while ids: + chunk = ids[:min(len(ids), _PK_MAX_INSTALL)] + del ids[:len(chunk)] + + _logger.debug('Install %r', chunk) + + resp = gbus.call(_pk, 'InstallPackages', True, chunk) + enforce(resp.error_code in ( + 'package-already-installed', + 'all-packages-already-installed', None), + 'Installation failed: %s (%s)', + resp.error_details, resp.error_code) + + +def _resolve(names): result = {} while names: @@ -54,22 +71,6 @@ def resolve(names): return result -def install(packages): - ids = [i['pk_id'] for i in packages] - while ids: - chunk = ids[:min(len(ids), _PK_MAX_INSTALL)] - del ids[:len(chunk)] - - _logger.debug('Install %r', chunk) - - resp = gbus.call(_pk, 'InstallPackages', True, chunk) - enforce(resp.error_code in ( - 'package-already-installed', - 'all-packages-already-installed', None), - 'Installation failed: %s (%s)', - resp.error_details, resp.error_code) - - class _Response(object): def __init__(self): @@ -104,10 +105,8 @@ def _pk(result, op, *args): resp.error_details = details def Package_cb(status, pk_id, summary): - from sugar_network.client import solver - package_name, version, arch, __ = pk_id.split(';') - clean_version = solver.try_cleanup_distro_version(version) + clean_version = _cleanup_distro_version(version) if not clean_version: _logger.warn('Cannot parse distribution version "%s" ' 'for package "%s"', version, package_name) @@ -117,7 +116,7 @@ def _pk(result, op, *args): 'pk_id': str(pk_id), 'version': clean_version, 'name': package_name, - 'arch': solver.canonicalize_machine(arch), + 'arch': _canonicalize_machine(arch), 'installed': (status == 'installed'), } _logger.debug('Found: %r', package) @@ -148,6 +147,51 @@ def _pk(result, op, *args): op(*args) +def _canonicalize_machine(arch): + arch = arch.lower() + if arch == 'x86': + return 'i386' + elif arch == 'amd64': + return 'x86_64' + elif arch == 'power macintosh': + return 'ppc' + elif arch == 'i86pc': + return 'i686' + + +def _cleanup_distro_version(version): + if ':' in version: + # Skip 'epoch' + version = version.split(':', 1)[1] + version = version.replace('_', '-') + if '~' in version: + version, suffix = version.split('~', 1) + if suffix.startswith('pre'): + suffix = suffix[3:] + suffix = '-pre' + (_cleanup_distro_version(suffix) or '') + else: + suffix = '' + match = _VERSION_RE.match(version) + if match: + major, version, revision = match.groups() + if major is not None: + version = major[:-1].rstrip('.') + '.' + version + if revision is not None: + version = '%s-%s' % (version, revision[2:]) + return version + suffix + return None + + +_DOTTED_RE = r'[0-9]+(?:\.[0-9]+)*' +# Matche a version number that would be a valid version without modification +_RELEASE_RE = '(?:%s)(?:-(?:pre|rc|post|)(?:%s))*' % (_DOTTED_RE, _DOTTED_RE) +# This matches the interesting bits of distribution version numbers +# (first matching group is for Java-style 6b17 or 7u9 syntax, or "major") +_VERSION_RE = re.compile( + r'(?:[a-z])?({ints}\.?[bu])?({zero})(-r{ints})?'.format( + zero=_RELEASE_RE, ints=_DOTTED_RE)) + + if __name__ == '__main__': import sys from pprint import pprint @@ -158,6 +202,6 @@ if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) if sys.argv[1] == 'install': - install(resolve(sys.argv[2:]).values()) + install(_resolve(sys.argv[2:]).values()) else: - pprint(resolve(sys.argv[1:])) + pprint(_resolve(sys.argv[1:])) diff --git a/sugar_network/client/releases.py b/sugar_network/client/releases.py deleted file mode 100644 index c93a91a..0000000 --- a/sugar_network/client/releases.py +++ /dev/null @@ -1,392 +0,0 @@ -# Copyright (C) 2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# pylint: disable=E1101 - -import os -import re -import sys -import time -import json -import random -import shutil -import hashlib -import logging -from copy import deepcopy -from os.path import join, exists, basename, dirname, relpath - -from sugar_network import client, toolkit -from sugar_network.client.cache import Cache -from sugar_network.client import journal, packagekit -from sugar_network.toolkit.router import Request, Response, route -from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import i18n, http, coroutine, enforce - - -_MIMETYPE_DEFAULTS_KEY = '/desktop/sugar/journal/defaults' -_MIMETYPE_INVALID_CHARS = re.compile('[^a-zA-Z0-9-_/.]') - -_logger = logging.getLogger('releases') - - -class Routes(object): - - def __init__(self): - self._node_mtime = None - self._call = lambda **kwargs: \ - self._map_exceptions(self.fallback, **kwargs) - self._cache = Cache() - - def invalidate_solutions(self, mtime): - self._node_mtime = mtime - - @route('GET', ['context', None], cmd='path') - def path(self, request): - clone = self._solve(request) - enforce(clone is not None, http.NotFound, 'No clones') - return clone['path'] - - @route('GET', ['context', None], cmd='launch', arguments={'args': list}, - mime_type='text/event-stream') - def launch(self, request): - activity_id = request.get('activity_id') - if 'object_id' in request and not activity_id: - activity_id = journal.get(request['object_id'], 'activity_id') - if not activity_id: - activity_id = _activity_id_new() - request.session['activity_id'] = activity_id - - for context in self._checkin_context(request): - yield {'event': 'launch', 'activity_id': activity_id}, request - - acquired = [] - try: - impl = self._solve(request, context['type']) - if 'activity' not in context['type']: - app = request.get('context') or \ - _mimetype_context(impl['data']['mime_type']) - enforce(app, 'Cannot find proper application') - acquired += self._checkin( - context, request, self._cache.acquire) - request = Request(path=['context', app], - object_id=impl['path'], session=request.session) - for context in self._checkin_context(request): - impl = self._solve(request, context['type']) - acquired += self._checkin( - context, request, self._cache.acquire) - - child = _exec(context, request, impl) - yield {'event': 'exec', 'activity_id': activity_id} - status = child.wait() - finally: - self._cache.release(*acquired) - - _logger.debug('Exit %s[%s]: %r', context.guid, child.pid, status) - enforce(status == 0, 'Process exited with %r status', status) - yield {'event': 'exit', 'activity_id': activity_id} - - @route('PUT', ['context', None], cmd='clone', arguments={'requires': list}, - mime_type='text/event-stream') - def clone(self, request): - enforce(not request.content or self.inline(), http.ServiceUnavailable, - 'Not available in offline') - for context in self._checkin_context(request, 'clone'): - if request.content: - impl = self._solve(request, context['type']) - self._checkin(context, request, self._cache.checkout) - yield {'event': 'ready'} - else: - clone = self._solve(request) - meta = this.volume['release'].get(clone['guid']).meta('data') - size = meta.get('unpack_size') or meta['blob_size'] - self._cache.checkin(clone['guid'], size) - - @route('GET', ['context', None], cmd='clone', - arguments={'requires': list}) - def get_clone(self, request, response): - return self._get_clone(request, response) - - @route('HEAD', ['context', None], cmd='clone', - arguments={'requires': list}) - def head_clone(self, request, response): - self._get_clone(request, response) - - @route('PUT', ['context', None], cmd='favorite') - def favorite(self, request): - for __ in self._checkin_context(request, 'favorite'): - pass - - @route('GET', cmd='recycle') - def recycle(self): - return self._cache.recycle() - - def _map_exceptions(self, fun, *args, **kwargs): - try: - return fun(*args, **kwargs) - except http.NotFound, error: - if self.inline(): - raise - raise http.ServiceUnavailable, error, sys.exc_info()[2] - - def _checkin_context(self, request, layer=None): - contexts = this.volume['context'] - guid = request.guid - if layer and not request.content and not contexts.exists(guid): - return - - if not contexts.exists(guid): - patch = self._call(method='GET', path=['context', guid], cmd='diff') - contexts.merge(guid, patch) - context = contexts.get(guid) - if layer and bool(request.content) == (layer in context['layer']): - return - - yield context - - if layer: - if request.content: - layer_value = set(context['layer']) | set([layer]) - else: - layer_value = set(context['layer']) - set([layer]) - contexts.update(guid, {'layer': list(layer_value)}) - _logger.debug('Checked %r in: %r', guid, layer_value) - - def _solve(self, request, force_type=None): - stability = request.get('stability') or \ - client.stability(request.guid) - - request.session['stability'] = stability - request.session['logs'] = [ - client.profile_path('logs', 'shell.log'), - client.profile_path('logs', 'sugar-network-client.log'), - ] - - _logger.debug('Solving %r stability=%r', request.guid, stability) - - solution, stale = self._cache_solution_get(request.guid, stability) - if stale is False: - _logger.debug('Reuse cached %r solution', request.guid) - elif solution is not None and (not force_type or not self.inline()): - _logger.debug('Reuse stale %r solution', request.guid) - elif not force_type: - return None - elif 'activity' in force_type: - from sugar_network.client import solver - solution = self._map_exceptions(solver.solve, - self.fallback, request.guid, stability) - else: - response = Response() - blob = self._call(method='GET', path=['context', request.guid], - cmd='clone', stability=stability, response=response) - release = response.meta - release['mime_type'] = response.content_type - release['size'] = response.content_length - files.post(blob, digest=release['spec']['*-*']['bundle']) - solution = [release] - - request.session['solution'] = solution - return solution[0] - - def _checkin(self, context, request, cache_call): - if 'clone' in context['layer']: - cache_call = self._cache.checkout - - if 'activity' in context['type']: - to_install = [] - for sel in request.session['solution']: - if 'install' in sel: - enforce(self.inline(), http.ServiceUnavailable, - 'Installation is not available in offline') - to_install.extend(sel.pop('install')) - if to_install: - packagekit.install(to_install) - - def cache_impl(sel): - guid = sel['guid'] - - - - - data = files.get(guid) - - if data is not None: - return cache_call(guid, data['unpack_size']) - - response = Response() - blob = self._call(method='GET', path=['release', guid, 'data'], - response=response) - - if 'activity' not in context['type']: - self._cache.ensure(response.content_length) - files.post(blob, response.meta, sel['data']) - return cache_call(guid, response.content_length) - - with toolkit.mkdtemp(dir=files.path(sel['data'])) as blob_dir: - self._cache.ensure( - response.meta['unpack_size'], - response.content_length) - with toolkit.TemporaryFile() as tmp_file: - shutil.copyfileobj(blob, tmp_file) - tmp_file.seek(0) - with Bundle(tmp_file, 'application/zip') as bundle: - bundle.extractall(blob_dir, prefix=bundle.rootdir) - for exec_dir in ('bin', 'activity'): - bin_path = join(blob_dir, exec_dir) - if not exists(bin_path): - continue - for filename in os.listdir(bin_path): - os.chmod(join(bin_path, filename), 0755) - - files.update(sel['data'], response.meta) - return cache_call(guid, response.meta['unpack_size']) - - result = [] - for sel in request.session['solution']: - if 'path' not in sel and sel['stability'] != 'packaged': - result.append(cache_impl(sel)) - self._cache_solution_set(context.guid, - request.session['stability'], request.session['solution']) - return result - - def _cache_solution_get(self, guid, stability): - path = client.path('solutions', guid) - solution = None - if exists(path): - try: - with file(path) as f: - cached_api_url, cached_stability, solution = json.load(f) - except Exception, error: - _logger.debug('Cannot open %r solution: %s', path, error) - if solution is None: - return None, None - - stale = (cached_api_url != client.api_url.value) - if not stale and cached_stability is not None: - stale = set(cached_stability) != set(stability) - if not stale and self._node_mtime is not None: - stale = (self._node_mtime > os.stat(path).st_mtime) - if not stale: - stale = (packagekit.mtime() > os.stat(path).st_mtime) - return _CachedSolution(solution), stale - - def _cache_solution_set(self, guid, stability, solution): - if isinstance(solution, _CachedSolution): - return - path = client.path('solutions', guid) - if not exists(dirname(path)): - os.makedirs(dirname(path)) - with file(path, 'w') as f: - json.dump([client.api_url.value, stability, solution], f) - - def _get_clone(self, request, response): - for context in self._checkin_context(request): - if 'clone' not in context['layer']: - return self._map_exceptions(self.fallback, request, response) - release = this.volume['release'].get(self._solve(request)['guid']) - response.meta = release.properties([ - 'guid', 'ctime', 'layer', 'author', 'tags', - 'context', 'version', 'stability', 'license', 'notes', 'data', - ]) - return release.meta('data') - - -def _activity_id_new(): - from uuid import getnode - data = '%s%s%s' % ( - time.time(), - random.randint(10000, 100000), - getnode()) - return hashlib.sha1(data).hexdigest() - - -def _mimetype_context(mime_type): - import gconf - mime_type = _MIMETYPE_INVALID_CHARS.sub('_', mime_type) - key = '/'.join([_MIMETYPE_DEFAULTS_KEY, mime_type]) - return gconf.client_get_default().get_string(key) - - -def _exec(context, request, sel): - # pylint: disable-msg=W0212 - datadir = client.profile_path('data', context.guid) - logdir = client.profile_path('logs') - - for path in [ - join(datadir, 'instance'), - join(datadir, 'data'), - join(datadir, 'tmp'), - logdir, - ]: - if not exists(path): - os.makedirs(path) - - cmd = sel['data']['spec']['*-*']['commands']['activity']['exec'] - args = cmd.split() + [ - '-b', request.guid, - '-a', request.session['activity_id'], - ] - if 'object_id' in request: - args.extend(['-o', request['object_id']]) - if 'uri' in request: - args.extend(['-u', request['uri']]) - if 'args' in request: - args.extend(request['args']) - request.session['args'] = args - - log_path = toolkit.unique_filename(logdir, context.guid + '.log') - request.session['logs'].append(log_path) - - child = coroutine.fork() - if child is not None: - _logger.debug('Exec %s[%s]: %r', request.guid, child.pid, args) - return child - - try: - with file('/dev/null', 'r') as f: - os.dup2(f.fileno(), 0) - with file(log_path, 'a+') as f: - os.dup2(f.fileno(), 1) - os.dup2(f.fileno(), 2) - toolkit.init_logging() - - impl_path = sel['path'] - os.chdir(impl_path) - - environ = os.environ - environ['PATH'] = ':'.join([ - join(impl_path, 'activity'), - join(impl_path, 'bin'), - environ['PATH'], - ]) - environ['PYTHONPATH'] = impl_path + ':' + \ - environ.get('PYTHONPATH', '') - environ['SUGAR_BUNDLE_PATH'] = impl_path - environ['SUGAR_BUNDLE_ID'] = context.guid - environ['SUGAR_BUNDLE_NAME'] = \ - i18n.decode(context['title']).encode('utf8') - environ['SUGAR_BUNDLE_VERSION'] = sel['version'] - environ['SUGAR_ACTIVITY_ROOT'] = datadir - environ['SUGAR_LOCALEDIR'] = join(impl_path, 'locale') - - os.execvpe(args[0], args, environ) - except BaseException: - logging.exception('Failed to execute %r args=%r', sel, args) - finally: - os._exit(1) - - -class _CachedSolution(list): - pass diff --git a/sugar_network/client/solver.py b/sugar_network/client/solver.py deleted file mode 100644 index 84eb9cf..0000000 --- a/sugar_network/client/solver.py +++ /dev/null @@ -1,407 +0,0 @@ -# Copyright (C) 2010-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# pylint: disable-msg=W0611,F0401,W0201,E1101,W0232 - -import sys -import logging -from os.path import isabs, join, dirname - -from sugar_network.client import packagekit -from sugar_network.toolkit.router import ACL -from sugar_network.toolkit.spec import parse_version -from sugar_network.toolkit import http, lsb_release - -sys.path.insert(0, join(dirname(__file__), '..', 'lib', 'zeroinstall')) - -from zeroinstall.injector import reader, model, arch as _arch -from zeroinstall.injector.config import Config -from zeroinstall.injector.driver import Driver -from zeroinstall.injector.requirements import Requirements -from zeroinstall.injector.arch import machine_ranks -from zeroinstall.injector.distro import try_cleanup_distro_version - - -_SUGAR_API_COMPATIBILITY = { - '0.94': frozenset(['0.86', '0.88', '0.90', '0.92', '0.94']), - } - -model.Interface.__init__ = lambda *args: _interface_init(*args) -reader.check_readable = lambda *args, **kwargs: True -reader.update_from_cache = lambda *args, **kwargs: None -reader.load_feed_from_cache = lambda url, **kwargs: _load_feed(url) - -_logger = logging.getLogger('solver') -_stability = None -_call = None - - -def canonicalize_machine(arch): - if arch in ('noarch', 'all'): - return None - return _arch.canonicalize_machine(arch) - - -def select_architecture(arches): - """Select most appropriate, for the host system, machine architecture - - :param arches: - list of architecture names to select - :returns: - one of passed architecture names, or, `None` if not any - - """ - result_rank = 9999 - result_arch = None - for arch in arches: - rank = machine_ranks.get(canonicalize_machine(arch)) - if rank is not None and rank < result_rank: - result_rank = rank - result_arch = arch - return result_arch - - -def solve(call, context, stability): - global _call, _stability - - _call = call - _stability = stability - - req = Requirements(context) - # TODO - req.command = 'activity' - config = Config() - driver = Driver(config, req) - solver = driver.solver - solver.record_details = True - status = None - ready = False - - while True: - solver.solve(context, driver.target_arch, command_name=req.command) - if ready and solver.ready: - break - ready = solver.ready - - resolved = None - for url in solver.feeds_used: - feed = config.iface_cache.get_feed(url) - if feed is None: - continue - while feed.to_resolve: - try: - resolved = packagekit.resolve(feed.to_resolve.pop(0)) - except Exception, error: - if feed.to_resolve: - continue - if status is None: - status = call(method='GET', cmd='whoami') - if status['route'] == 'offline': - raise http.ServiceUnavailable(str(error)) - else: - raise - feed.resolve(resolved.values()) - feed.to_resolve = None - if not resolved: - break - - selections = solver.selections.selections - missed = [] - - top_summary = [] - dep_summary = [] - for iface, impls in solver.details.items(): - summary = (top_summary if iface.uri == context else dep_summary) - summary.append(iface.uri) - if impls: - sel = selections.get(iface.uri) - for impl, reason in impls: - if not reason and sel is None: - reason = 'wrong version' - missed.append(iface.uri) - if reason: - reason = '(%s)' % reason - summary.append('%s v%s %s' % ( - '*' if sel is not None and sel.impl is impl else ' ', - impl.get_version(), - reason or '', - )) - else: - summary.append(' (no versions)') - missed.append(iface.uri) - _logger.debug('[%s] Solving results:\n%s', - context, '\n'.join(top_summary + dep_summary)) - - if not ready: - # pylint: disable-msg=W0212 - reason_exception = solver.get_failure_reason() - if reason_exception is not None: - reason = reason_exception.message - else: - reason = 'Cannot find releases for %s' % ', '.join(missed) - raise http.NotFound(reason) - - solution = [] - solution.append(_impl_new(config, context, selections[context])) - for iface, sel in selections.items(): - if sel is not None and iface != context: - solution.append(_impl_new(config, iface, sel)) - - return solution - - -def _interface_init(self, url): - self.uri = url - self.reset() - - -def _impl_new(config, iface, sel): - impl = sel.impl.sn_impl - impl['context'] = iface - if sel.local_path: - impl['path'] = sel.local_path - if sel.impl.to_install: - impl['install'] = sel.impl.to_install - return impl - - -def _load_feed(context): - feed = _Feed(context) - - if context == 'sugar': - try: - from jarabe import config - host_version = '.'.join(config.version.split('.', 2)[:2]) - except ImportError: - # XXX sweets-sugar binding might be not sourced - host_version = '0.94' - for version in _SUGAR_API_COMPATIBILITY.get(host_version) or []: - feed.implement_sugar(version) - feed.name = context - return feed - - releases = None - try: - releases = _call(method='GET', path=['context', context, 'releases']) - _logger.trace('[%s] Found feed: %r', context, releases) - except http.ServiceUnavailable: - _logger.trace('[%s] Failed to fetch the feed', context) - raise - except Exception: - _logger.exception('[%s] Failed to fetch the feed', context) - return None - - """ - for digest, release in releases: - if [i for i in release['author'].values() - if i['role'] & ACL.ORIGINAL] and \ - release['stability'] == _stability and \ - f - - - - - - stability=_stability, - distro=lsb_release.distributor_id()) - """ - - for impl in feed_content['releases']: - feed.implement(impl) - - - - # XXX 0install fails on non-ascii `name` values - feed.name = context - feed.to_resolve = feed_content.get('packages') - if not feed.to_resolve: - _logger.trace('[%s] No compatible packages', context) - - - if not feed.to_resolve and not feed.implementations: - _logger.trace('[%s] No releases', context) - - return feed - - -class _Feed(model.ZeroInstallFeed): - # pylint: disable-msg=E0202 - - def __init__(self, context): - self.context = context - self.local_path = None - self.implementations = {} - self.last_modified = None - self.feeds = [] - self.metadata = [] - self.last_checked = None - self.to_resolve = None - self._package_implementations = [] - - @property - def url(self): - return self.context - - @property - def feed_for(self): - return set([self.context]) - - def resolve(self, packages): - top_package = packages[0] - - impl = _Release(self, self.context, None) - impl.version = parse_version(top_package['version']) - impl.released = 0 - impl.arch = '*-%s' % (top_package['arch'] or '*') - impl.upstream_stability = model.stability_levels['packaged'] - impl.to_install = [i for i in packages if not i['installed']] - impl.add_download_source(self.context, 0, None) - impl.sn_impl = { - 'guid': self.context, - 'license': None, - 'version': top_package['version'], - 'stability': 'packaged', - } - - self.implementations[self.context] = impl - - def implement(self, release): - impl_id = release['guid'] - spec = release['data']['spec']['*-*'] - - impl = _Release(self, impl_id, None) - impl.version = parse_version(release['version']) - impl.released = 0 - impl.arch = '*-*' - impl.upstream_stability = model.stability_levels['stable'] - impl.license = release.get('license') or [] - impl.requires = _read_requires(spec.get('requires')) - impl.requires.extend(_read_requires(release.get('requires'))) - impl.sn_impl = release - - if isabs(impl_id): - impl.local_path = impl_id - else: - impl.add_download_source(impl_id, 0, None) - - for name, command in spec['commands'].items(): - impl.commands[name] = _Command(name, command) - - for name, insert, mode in spec.get('bindings') or []: - binding = model.EnvironmentBinding(name, insert, mode=mode) - impl.bindings.append(binding) - - self.implementations[impl_id] = impl - - def implement_sugar(self, sugar_version): - impl_id = 'sugar-%s' % sugar_version - impl = _Release(self, impl_id, None) - impl.version = parse_version(sugar_version) - impl.released = 0 - impl.arch = '*-*' - impl.upstream_stability = model.stability_levels['packaged'] - self.implementations[impl_id] = impl - impl.sn_impl = { - 'guid': impl_id, - 'license': None, - 'version': sugar_version, - 'stability': 'packaged', - } - - -class _Release(model.ZeroInstallImplementation): - - to_install = None - sn_impl = None - license = None - - def is_available(self, stores): - # Simplify solving - return True - - -class _Dependency(model.InterfaceDependency): - - def __init__(self, guid, data): - self._importance = data.get('importance', model.Dependency.Essential) - self._metadata = {} - self.qdom = None - self.interface = guid - self.restrictions = [] - self.bindings = [] - - for not_before, before in data.get('restrictions') or []: - restriction = model.VersionRangeRestriction( - not_before=parse_version(not_before), - before=parse_version(before)) - self.restrictions.append(restriction) - - @property - def context(self): - return self.interface - - @property - def metadata(self): - return self._metadata - - @property - def importance(self): - return self._importance - - def get_required_commands(self): - return [] - - @property - def command(self): - pass - - -class _Command(model.Command): - - def __init__(self, name, command): - self.qdom = None - self.name = name - self._requires = _read_requires(command.get('requires')) - - @property - def path(self): - return 'doesnt_matter' - - @property - def requires(self): - return self._requires - - def get_runner(self): - pass - - def __str__(self): - return '' - - @property - def bindings(self): - return [] - - -def _read_requires(data): - result = [] - for guid, dep_data in (data or {}).items(): - result.append(_Dependency(guid, dep_data)) - return result - - -if __name__ == '__main__': - from pprint import pprint - logging.basicConfig(level=logging.DEBUG) - pprint(solve(*sys.argv[1:])) diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index 77a322c..9e1aaf5 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -140,9 +140,10 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): release['license'] = this.request['license'] if isinstance(release['license'], basestring): release['license'] = [release['license']] + release['stability'] = 'stable' release['bundles'] = { '*-*': { - 'bundle': blob.digest, + 'blob': blob.digest, }, } else: diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 559f6b4..8de6038 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -21,7 +21,7 @@ from sugar_network.model import Release, context as base_context from sugar_network.node import obs from sugar_network.toolkit.router import ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import spec, sat, http, coroutine, enforce +from sugar_network.toolkit import spec, sat, http, coroutine, i18n, enforce _logger = logging.getLogger('node.model') @@ -133,7 +133,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, top_context.guid, lsb_id, lsb_release, stability, top_requires) def rate_release(digest, release): - return [command in release['commands'], + return [command in release.get('commands', []), _STABILITY_RATES.get(release['stability']) or 0, release['version'], digest, @@ -186,17 +186,23 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, for release in reversed(candidates): digest = release[-1] release = releases[digest]['value'] - release_info = {'version': release['version'], 'blob': digest} + release_info = { + 'title': i18n.decode(context['title'], + this.request.accept_language), + 'version': release['version'], + 'blob': digest, + } blob = volume.blobs.get(digest) if blob is not None: release_info['size'] = blob.size + release_info['content-type'] = blob['content-type'] unpack_size = release['bundles']['*-*'].get('unpack_size') if unpack_size is not None: release_info['unpack_size'] = unpack_size requires = release.get('requires') or {} if top_requires and context.guid == top_context.guid: requires.update(top_requires) - if context.guid == top_context.guid: + if context.guid == top_context.guid and 'commands' in release: cmd = release['commands'].get(command) if cmd is None: cmd_name, cmd = release['commands'].items()[0] diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 86e4ce1..5fdb27e 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -113,22 +113,6 @@ class NodeRoutes(db.Routes, FrontRoutes): content_type='application/json', content=release) return blob.digest - @route('PUT', [None, None], cmd='attach', acl=ACL.AUTH | ACL.SUPERUSER) - def attach(self, request): - # TODO Reading layer here is a race - directory = self.volume[request.resource] - doc = directory.get(request.guid) - layer = list(set(doc['layer']) | set(request.content)) - directory.update(request.guid, {'layer': layer}) - - @route('PUT', [None, None], cmd='detach', acl=ACL.AUTH | ACL.SUPERUSER) - def detach(self, request): - # TODO Reading layer here is a race - directory = self.volume[request.resource] - doc = directory.get(request.guid) - layer = list(set(doc['layer']) - set(request.content)) - directory.update(request.guid, {'layer': layer}) - @route('GET', ['context', None], cmd='solve', arguments={'requires': list, 'stability': list}, mime_type='application/json') diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 67ee7da..70868c0 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -21,7 +21,6 @@ import shutil import logging import tempfile import collections -from copy import deepcopy from cStringIO import StringIO from os.path import exists, join, islink, isdir, dirname, basename, abspath from os.path import lexists, isfile @@ -497,14 +496,12 @@ class Bin(object): def __init__(self, path, default_value=None): self._path = abspath(path) self.value = default_value - self._orig_value = None if exists(self._path): with file(self._path) as f: self.value = json.load(f) else: self.commit() - self._orig_value = deepcopy(self.value) @property def mtime(self): @@ -514,27 +511,33 @@ class Bin(object): return 0 def commit(self): - """Store current value in a file. - - :returns: - `True` if commit was happened - - """ - if self.value == self._orig_value: - return False + """Store current value in a file.""" with new_file(self._path) as f: json.dump(self.value, f) f.flush() os.fsync(f.fileno()) - self._orig_value = deepcopy(self.value) - return True def __enter__(self): - return self + return self.value def __exit__(self, exc_type, exc_value, traceback): self.commit() + def __contains__(self, key): + return key in self.value + + def __getitem__(self, key): + return self.value.get(key) + + def __setitem__(self, key, value): + self.value[key] = value + + def __delitem__(self, key): + del self.value[key] + + def __getattr__(self, name): + return getattr(self.value, name) + class Seqno(Bin): """Sequence number counter with persistent storing in a file.""" diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 254e6f3..9b9754e 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -112,8 +112,8 @@ class Connection(object): _Session = None - def __init__(self, api='', auth=None, max_retries=0, **session_args): - self.api = api + def __init__(self, url='', auth=None, max_retries=0, **session_args): + self.url = url self.auth = auth self._max_retries = max_retries self._session_args = session_args @@ -121,7 +121,7 @@ class Connection(object): self._nonce = None def __repr__(self): - return '<Connection api=%s>' % self.api + return '<Connection url=%s>' % self.url def __enter__(self): return self @@ -183,13 +183,10 @@ class Connection(object): finally: if isinstance(dst, basestring): f.close() + return reply def upload(self, path, data, **kwargs): - if isinstance(data, basestring): - with file(data, 'rb') as f: - reply = self.request('POST', path, f, params=kwargs) - else: - reply = self.request('POST', path, data, params=kwargs) + reply = self.request('POST', path, data, params=kwargs) if reply.headers.get('Content-Type') == 'application/json': return json.loads(reply.content) else: @@ -203,7 +200,7 @@ class Connection(object): if not path: path = [''] if not isinstance(path, basestring): - path = '/'.join([i.strip('/') for i in [self.api] + path]) + path = '/'.join([i.strip('/') for i in [self.url] + path]) try_ = 0 while True: @@ -283,7 +280,7 @@ class Connection(object): break path = reply.headers['location'] if path.startswith('/'): - path = self.api + path + path = self.url + path if request.method != 'HEAD': if reply.headers.get('Content-Type') == 'application/json': @@ -432,7 +429,7 @@ class _Subscription(object): if try_ == 0: raise toolkit.exception('Failed to read from %r subscription, ' - 'will resubscribe', self._client.api) + 'will resubscribe', self._client.url) self._content = None return _parse_event(line) @@ -441,7 +438,7 @@ class _Subscription(object): return self._content params.update(self._condition) params['cmd'] = 'subscribe' - _logger.debug('Subscribe to %r, %r', self._client.api, params) + _logger.debug('Subscribe to %r, %r', self._client.url, params) response = self._client.request('GET', params=params) self._content = response.raw return self._content diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py index 43e6960..f09bdb5 100644 --- a/sugar_network/toolkit/parcel.py +++ b/sugar_network/toolkit/parcel.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 48a04fe..8e23863 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -118,14 +118,13 @@ class Unauthorized(http.Unauthorized): class Request(dict): def __init__(self, environ=None, method=None, path=None, cmd=None, - content=None, content_stream=None, content_type=None, session=None, + content=None, content_stream=None, content_type=None, principal=None, **kwargs): dict.__init__(self) self.path = [] self.cmd = None self.environ = {} - self.session = session or {} self.principal = principal self._content = _NOT_SET @@ -756,15 +755,17 @@ class Router(object): commons['prop'] = request.prop try: for event in _event_stream(request, stream): - event.update(commons) - this.localcast(event) + if 'event' not in event: + commons.update(event) + else: + event.update(commons) + this.localcast(event) except Exception, error: _logger.exception('Event stream %r failed', request) event = {'event': 'failure', 'exception': type(error).__name__, 'error': str(error), } - event.update(request.session) event.update(commons) this.localcast(event) |