diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-24 11:55:25 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-24 11:55:25 (GMT) |
commit | 6ec16441c7c133c55385613f1e430c5ea37af632 (patch) | |
tree | 51870c8fa43a3bcabc6918206b3fc5265a91300a /sugar_network | |
parent | 40021927aa1815dd54e2e7839a46e5bd1ae8c7b3 (diff) |
Fix basic client routes
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/client/__init__.py | 7 | ||||
-rw-r--r-- | sugar_network/client/injector.py | 113 | ||||
-rw-r--r-- | sugar_network/client/journal.py | 13 | ||||
-rw-r--r-- | sugar_network/client/model.py | 36 | ||||
-rw-r--r-- | sugar_network/client/routes.py | 293 | ||||
-rw-r--r-- | sugar_network/db/directory.py | 36 | ||||
-rw-r--r-- | sugar_network/db/metadata.py | 3 | ||||
-rw-r--r-- | sugar_network/db/resource.py | 62 | ||||
-rw-r--r-- | sugar_network/db/routes.py | 130 | ||||
-rw-r--r-- | sugar_network/db/volume.py | 24 | ||||
-rw-r--r-- | sugar_network/model/__init__.py | 30 | ||||
-rw-r--r-- | sugar_network/model/context.py | 46 | ||||
-rw-r--r-- | sugar_network/model/post.py | 3 | ||||
-rw-r--r-- | sugar_network/model/report.py | 3 | ||||
-rw-r--r-- | sugar_network/model/routes.py | 1 | ||||
-rw-r--r-- | sugar_network/node/master.py | 10 | ||||
-rw-r--r-- | sugar_network/node/model.py | 22 | ||||
-rw-r--r-- | sugar_network/node/routes.py | 16 | ||||
-rw-r--r-- | sugar_network/node/slave.py | 23 | ||||
-rw-r--r-- | sugar_network/toolkit/coroutine.py | 5 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 3 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 36 |
22 files changed, 481 insertions, 434 deletions
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py index 446795a..648d418 100644 --- a/sugar_network/client/__init__.py +++ b/sugar_network/client/__init__.py @@ -81,11 +81,6 @@ hub_root = Option( 'from file:// url', default='/usr/share/sugar-network/hub') -layers = Option( - 'comma separated list of layers to restrict Sugar Network content by', - default=[], type_cast=Option.list_cast, type_repr=Option.list_repr, - name='layers') - discover_node = Option( 'discover nodes in local network instead of using --api', default=False, type_cast=Option.bool_cast, @@ -179,7 +174,7 @@ def Connection(url=None, **args): def IPCConnection(): return http.Connection( - api='http://127.0.0.1:%s' % ipc_port.value, + 'http://127.0.0.1:%s' % ipc_port.value, # Online ipc->client->node request might fail if node connection # is lost in client process, so, re-send ipc request immediately # to retrive data from client in offline mode without propagating diff --git a/sugar_network/client/injector.py b/sugar_network/client/injector.py index 12baf51..6d0c420 100644 --- a/sugar_network/client/injector.py +++ b/sugar_network/client/injector.py @@ -27,6 +27,7 @@ from sugar_network import toolkit from sugar_network.client import packagekit, journal, profile_path from sugar_network.toolkit.spec import format_version from sugar_network.toolkit.bundle import Bundle +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import lsb_release, coroutine, i18n, pylru, http from sugar_network.toolkit import enforce @@ -47,6 +48,7 @@ class Injector(object): limit_bytes, limit_percent) self._api = None self._checkins = toolkit.Bin(join(root, 'checkins'), {}) + self._inprogress = {} for dir_name in ('solutions', 'releases'): dir_path = join(root, dir_name) @@ -81,8 +83,8 @@ class Injector(object): yield {'event': 'launch', 'state': 'init'} releases = [] - acquired = [] checkedin = {} + inprogress = [] environ = {} def acquire(ctx): @@ -92,8 +94,8 @@ class Injector(object): if ctx in self._checkins: checkedin[ctx] = (self.api, stability, self.seqno) else: - _logger.debug('Acquire %r', ctx) - acquired.extend(solution.values()) + inprogress.append((ctx, solution)) + self._progress_in(ctx) releases.extend(solution.values()) release = solution[ctx] return release, self._pool.path(release['blob']) @@ -135,9 +137,9 @@ class Injector(object): yield environ status = child.wait() finally: - if acquired: - _logger.debug('Release acquired contexts') - self._pool.push(acquired) + for ctx, solution in inprogress: + self._progress_out(ctx, True) + self._pool.push(solution.values()) if checkedin: with self._checkins as checkins: @@ -148,19 +150,22 @@ class Injector(object): yield {'event': 'launch', 'state': 'exit'} def checkin(self, context, stability='stable'): - if context in self._checkins: - _logger.debug('Refresh %r checkin', context) - else: - _logger.debug('Checkin %r', context) - yield {'event': 'checkin', 'state': 'solve'} - solution = self._solve(context, stability) - for event in self._download(solution.values()): - event['event'] = 'checkin' - yield event - self._pool.pop(solution.values()) - with self._checkins as checkins: - checkins[context] = (self.api, stability, self.seqno) - yield {'event': 'checkin', 'state': 'ready'} + self._progress_in(context) + try: + yield {'event': 'checkin', 'state': 'solve'} + solution = self._solve(context, stability) + for event in self._download(solution.values()): + event['event'] = 'checkin' + yield event + self._pool.pop(solution.values()) + with self._checkins as checkins: + checkins[context] = (self.api, stability, self.seqno) + yield {'event': 'checkin', 'state': 'ready'} + directory = this.volume['context'] + pins = list(set(directory[context]['pins']) | set(['checkin'])) + directory.update(context, {'pins': pins}) + finally: + self._progress_out(context) def checkout(self, context): if context not in self._checkins: @@ -171,8 +176,51 @@ class Injector(object): self._pool.push(solution.values()) with self._checkins as checkins: del checkins[context] + directory = this.volume['context'] + pins = list(set(directory[context]['pins']) - set(['checkin'])) + directory.update(context, {'pins': pins}) + self._notify(context) return True + def pins(self, context, stability='stable'): + result = [] + if self.api and context in self._checkins: + api, s, seqno = self._checkins[context] + if api != self.api or s != stability or seqno != self.seqno: + result.append('stale') + if self._inprogress.get(context): + result.append('inprogress') + return result + + def _notify(self, context, force=False): + if not force and not self.api: + return + doc = this.volume['context'][context] + pins = doc.repr('pins') if doc.exists else self.pins(context) + this.localcast({ + 'event': 'update', + 'resource': 'context', + 'guid': context, + 'props': {'pins': pins}, + }) + + def _progress_in(self, context): + progress = self._inprogress.setdefault(context, 0) + self._inprogress[context] = progress + 1 + if not progress: + _logger.debug('%r is in-progress', context) + self._notify(context, True) + + def _progress_out(self, context, force=False): + progress = self._inprogress.get(context) + if not progress: + _logger.warn('Progress counter broken for %r', context) + return + self._inprogress[context] = progress - 1 + if progress == 1: + _logger.debug('%r is not in-progress', context) + self._notify(context, force) + def _solve(self, context, stability): path = join(self._root, 'solutions', context) solution = None @@ -193,7 +241,8 @@ class Injector(object): _logger.debug('Reuse cached %r solution in offline', context) if not solution: - enforce(self.api, 'Cannot solve in offline') + enforce(self.api, http.ServiceUnavailable, + 'Not available in offline') _logger.debug('Solve %r', context) solution = self._api.get(['context', context], cmd='solve', stability=stability, lsb_id=lsb_release.distributor_id(), @@ -422,21 +471,21 @@ def _exec(context, release, path, args, environ): os.chdir(path) - environ = os.environ - environ['PATH'] = ':'.join([ + env = os.environ + env['PATH'] = ':'.join([ join(path, 'activity'), join(path, 'bin'), - environ['PATH'], + env['PATH'], ]) - environ['PYTHONPATH'] = path + ':' + environ.get('PYTHONPATH', '') - environ['SUGAR_BUNDLE_PATH'] = path - environ['SUGAR_BUNDLE_ID'] = context - environ['SUGAR_BUNDLE_NAME'] = i18n.decode(release['title']) - environ['SUGAR_BUNDLE_VERSION'] = format_version(release['version']) - environ['SUGAR_ACTIVITY_ROOT'] = datadir - environ['SUGAR_LOCALEDIR'] = join(path, 'locale') - - os.execvpe(args[0], args, environ) + env['PYTHONPATH'] = path + ':' + env.get('PYTHONPATH', '') + env['SUGAR_BUNDLE_PATH'] = path + env['SUGAR_BUNDLE_ID'] = context + env['SUGAR_BUNDLE_NAME'] = i18n.decode(release['title']) + env['SUGAR_BUNDLE_VERSION'] = format_version(release['version']) + env['SUGAR_ACTIVITY_ROOT'] = datadir + env['SUGAR_LOCALEDIR'] = join(path, 'locale') + + os.execvpe(args[0], args, env) except BaseException: logging.exception('Failed to execute %r args=%r', release, args) finally: diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py index 0dcae12..6a8f5ed 100644 --- a/sugar_network/client/journal.py +++ b/sugar_network/client/journal.py @@ -19,8 +19,8 @@ import logging from shutil import copyfileobj from tempfile import NamedTemporaryFile -from sugar_network import client, toolkit -from sugar_network.toolkit.router import route, Request +from sugar_network import client +from sugar_network.toolkit.router import route, Request, File from sugar_network.toolkit import enforce @@ -105,14 +105,15 @@ class Routes(object): @route('GET', ['journal', None, 'preview']) def journal_get_preview(self, request, response): - return toolkit.File(_prop_path(request.guid, 'preview'), { - 'mime_type': 'image/png', + return File(_prop_path(request.guid, 'preview'), meta={ + 'content-type': 'image/png', }) @route('GET', ['journal', None, 'data']) def journal_get_data(self, request, response): - return toolkit.File(_ds_path(request.guid, 'data'), { - 'mime_type': get(request.guid, 'mime_type') or 'application/octet', + return File(_ds_path(request.guid, 'data'), meta={ + 'content-type': get(request.guid, 'mime_type') or + 'application/octet', }) @route('GET', ['journal', None, None], mime_type='application/json') diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py new file mode 100644 index 0000000..6207af2 --- /dev/null +++ b/sugar_network/client/model.py @@ -0,0 +1,36 @@ +# Copyright (C) 2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging + +from sugar_network import db +from sugar_network.model.user import User +from sugar_network.model.post import Post +from sugar_network.model.report import Report +from sugar_network.model import context as base_context +from sugar_network.toolkit.coroutine import this + + +_logger = logging.getLogger('client.model') + + +class Context(base_context.Context): + + @db.indexed_property(db.List, prefix='RP', default=[]) + def pins(self, value): + return value + this.injector.pins(self.guid) + + +RESOURCES = (User, Context, Post, Report) diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index 50d8632..c4b645d 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -17,66 +17,53 @@ import os import logging from base64 import b64encode from httplib import IncompleteRead -from zipfile import ZipFile, ZIP_DEFLATED -from os.path import join, basename +from os.path import join from sugar_network import db, client, node, toolkit, model -from sugar_network.client import journal, releases -from sugar_network.node.slave import SlaveRoutes -from sugar_network.toolkit import netlink, mountpoints +from sugar_network.client import journal from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.router import ACL, Request, Response, Router from sugar_network.toolkit.router import route, fallbackroute -from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce +from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel +from sugar_network.toolkit import lsb_release, exception, enforce -# Top-level directory name to keep SN data on mounted devices -_SN_DIRNAME = 'sugar-network' # Flag file to recognize a directory as a synchronization directory -_SYNC_DIRNAME = 'sugar-network-sync' _RECONNECT_TIMEOUT = 3 _RECONNECT_TIMEOUT_MAX = 60 * 15 -_LOCAL_LAYERS = frozenset(['local', 'clone', 'favorite']) _logger = logging.getLogger('client.routes') -class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): +class ClientRoutes(model.FrontRoutes, journal.Routes): - def __init__(self, home_volume, api_url=None, no_subscription=False): + def __init__(self, home_volume, no_subscription=False): model.FrontRoutes.__init__(self) - releases.Routes.__init__(self, home_volume) journal.Routes.__init__(self) + this.localcast = this.broadcast + self._local = _LocalRoutes(home_volume) self._inline = coroutine.Event() self._inline_job = coroutine.Pool() self._remote_urls = [] self._node = None - self._jobs = coroutine.Pool() + self._connect_jobs = coroutine.Pool() self._no_subscription = no_subscription - self._server_mode = not api_url - self._api_url = api_url self._auth = _Auth() - if not client.delayed_start.value: - self.connect() - - def connect(self): - self._got_offline(force=True) - if self._server_mode: - enforce(not client.login.value) - mountpoints.connect(_SN_DIRNAME, - self._found_mount, self._lost_mount) + def connect(self, api=None): + if self._connect_jobs: + return + self._got_offline() + if not api: + self._connect_jobs.spawn(self._discover_node) else: - if client.discover_server.value: - self._jobs.spawn(self._discover_node) - else: - self._remote_urls.append(self._api_url) - self._jobs.spawn(self._wait_for_connectivity) + self._remote_urls.append(api) + self._connect_jobs.spawn(self._wait_for_connectivity) def close(self): - self._jobs.kill() + self._connect_jobs.kill() self._got_offline() self._local.volume.close() @@ -132,63 +119,89 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): return result @route('GET', [None], - arguments={ - 'offset': int, - 'limit': int, - 'reply': ('guid',), - 'layer': list, - }, + arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, mime_type='application/json') - def find(self, request, response, layer): - if set(request.get('layer', [])) & set(['favorite', 'clone']): + def find(self, request, response): + if not self._inline.is_set() or 'pins' in request: return self._local.call(request, response) reply = request.setdefault('reply', ['guid']) - if 'layer' not in reply: + if 'pins' not in reply: return self.fallback(request, response) if 'guid' not in reply: - # Otherwise there is no way to mixin local `layer` + # Otherwise there is no way to mixin `pins` reply.append('guid') result = self.fallback(request, response) directory = self._local.volume[request.resource] for item in result['result']: - if directory.exists(item['guid']): - existing_layer = directory.get(item['guid'])['layer'] - item['layer'][:] = set(item['layer']) | set(existing_layer) + doc = directory[item['guid']] + if doc.exists: + item['pins'] += doc.repr('pins') return result @route('GET', [None, None], mime_type='application/json') def get(self, request, response): - if self._local.volume[request.resource].exists(request.guid): + if self._local.volume[request.resource][request.guid].exists: return self._local.call(request, response) else: return self.fallback(request, response) @route('GET', [None, None, None], mime_type='application/json') def get_prop(self, request, response): - if self._local.volume[request.resource].exists(request.guid): + if self._local.volume[request.resource][request.guid].exists: return self._local.call(request, response) else: return self.fallback(request, response) @route('POST', ['report'], cmd='submit', mime_type='text/event-stream') - def submit_report(self, request, response): - logs = request.content.pop('logs') + def submit_report(self): + props = this.request.content + logs = props.pop('logs') + props['uname'] = os.uname() + props['lsb_release'] = { + 'distributor_id': lsb_release.distributor_id(), + 'release': lsb_release.release(), + } guid = self.fallback(method='POST', path=['report'], - content=request.content, content_type='application/json') - if logs: - with toolkit.TemporaryFile() as tmpfile: - with ZipFile(tmpfile, 'w', ZIP_DEFLATED) as zipfile: - for path in logs: - zipfile.write(path, basename(path)) - tmpfile.seek(0) - self.fallback(method='PUT', path=['report', guid, 'data'], - content_stream=tmpfile, content_type='application/zip') + content=props, content_type='application/json') + for logfile in logs: + with file(logfile) as f: + self.fallback(method='POST', path=['report', guid, 'logs'], + content_stream=f, content_type='text/plain') yield {'event': 'done', 'guid': guid} + @route('GET', ['context', None], cmd='launch', arguments={'args': list}, + mime_type='text/event-stream') + def launch(self): + return this.injector.launch(this.request.guid, **this.request) + + @route('PUT', ['context', None], cmd='checkin', + mime_type='text/event-stream') + def put_checkin(self): + self._checkin_context() + for event in this.injector.checkin(this.request.guid): + yield event + + @route('DELETE', ['context', None], cmd='checkin') + def delete_checkin(self, request): + this.injector.checkout(this.request.guid) + self._checkout_context() + + @route('PUT', ['context', None], cmd='favorite') + def put_favorite(self, request): + self._checkin_context('favorite') + + @route('DELETE', ['context', None], cmd='favorite') + def delete_favorite(self, request): + self._checkout_context('favorite') + + @route('GET', cmd='recycle') + def recycle(self): + return this.injector.recycle() + @fallbackroute() def fallback(self, request=None, response=None, **kwargs): if request is None: @@ -199,8 +212,6 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): if not self._inline.is_set(): return self._local.call(request, response) - if client.layers.value and request.resource in ('context', 'release'): - request.add('layer', *client.layers.value) request.principal = self._auth.login try: reply = self._node.call(request, response) @@ -217,21 +228,23 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): self._restart_online() return self._local.call(request, response) - def _got_online(self): + def _got_online(self, url): enforce(not self._inline.is_set()) _logger.debug('Got online on %r', self._node) self._inline.set() + self._local.volume.mute = True + this.injector.api = url this.localcast({'event': 'inline', 'state': 'online'}) - def _got_offline(self, force=False): - if not force and not self._inline.is_set(): - return + def _got_offline(self): if self._node is not None: self._node.close() if self._inline.is_set(): _logger.debug('Got offline on %r', self._node) - this.localcast({'event': 'inline', 'state': 'offline'}) self._inline.clear() + self._local.volume.mute = False + this.injector.api = None + this.localcast({'event': 'inline', 'state': 'offline'}) def _restart_online(self): _logger.debug('Lost %r connection, try to reconnect in %s seconds', @@ -256,10 +269,8 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): def pull_events(): for event in self._node.subscribe(): - if event.get('resource') == 'release': - mtime = event.get('mtime') - if mtime: - self.invalidate_solutions(mtime) + if event.get('event') == 'release': + this.injector.seqno = event['seqno'] this.broadcast(event) def handshake(url): @@ -267,16 +278,13 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): self._node = client.Connection(url, auth=self._auth) status = self._node.get(cmd='status') self._auth.allow_basic_auth = (status.get('level') == 'master') - """ - TODO switch to seqno - impl_info = status['resources'].get('release') - if impl_info: - self.invalidate_solutions(impl_info['mtime']) - """ + seqno = status.get('seqno') + if seqno and 'releases' in seqno: + this.injector.seqno = seqno['releases'] if self._inline.is_set(): _logger.info('Reconnected to %r node', url) else: - self._got_online() + self._got_online(url) def connect(): timeout = _RECONNECT_TIMEOUT @@ -307,40 +315,32 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes): self._inline_job.kill() self._inline_job.spawn_later(timeout, connect) - def _found_mount(self, root): - if self._inline.is_set(): - _logger.debug('Found %r node mount but %r is already active', - root, self._node.volume.root) - return - - _logger.debug('Found %r node mount', root) - - db_path = join(root, _SN_DIRNAME, 'db') - node.data_root.value = db_path - node.stats_root.value = join(root, _SN_DIRNAME, 'stats') - node.files_root.value = join(root, _SN_DIRNAME, 'files') - volume = db.Volume(db_path, model.RESOURCES) - - if not volume['user'].exists(self._auth.login): - profile = self._auth.profile() - profile['guid'] = self._auth.login - volume['user'].create(profile) - - self._node = _NodeRoutes(join(db_path, 'node'), volume) - self._jobs.spawn(volume.populate) - - logging.info('Start %r node on %s port', volume.root, node.port.value) - server = coroutine.WSGIServer(('0.0.0.0', node.port.value), self._node) - self._inline_job.spawn(server.serve_forever) - self._got_online() - - def _lost_mount(self, root): - if not self._inline.is_set() or \ - not self._node.volume.root.startswith(root): + def _checkin_context(self, pin=None): + context = this.volume['context'][this.request.guid] + if not context.exists: + enforce(self.inline(), http.ServiceUnavailable, + 'Not available in offline') + _logger.debug('Checkin %r context', context.guid) + clone = self.fallback( + method='GET', path=['context', context.guid], cmd='clone') + this.volume.patch(next(parcel.decode(clone))) + pins = context['pins'] + if pin and pin not in pins: + this.volume['context'].update(context.guid, {'pins': pins + [pin]}) + + def _checkout_context(self, pin=None): + directory = this.volume['context'] + context = directory[this.request.guid] + if not context.exists: return - _logger.debug('Lost %r node mount', root) - self._inline_job.kill() - self._got_offline() + pins = set(context.repr('pins')) + if pin: + pins -= set([pin]) + if not self._inline.is_set() or pins: + if pin: + directory.update(context.guid, {'pins': list(pins)}) + else: + directory.delete(context.guid) class CachedClientRoutes(ClientRoutes): @@ -351,16 +351,16 @@ class CachedClientRoutes(ClientRoutes): self._push_job = coroutine.Pool() ClientRoutes.__init__(self, home_volume, api_url, no_subscription) - def _got_online(self): - ClientRoutes._got_online(self) + def _got_online(self, url): + ClientRoutes._got_online(self, url) self._push_job.spawn(self._push) - def _got_offline(self, force=True): + def _got_offline(self): self._push_job.kill() - ClientRoutes._got_offline(self, force) + ClientRoutes._got_offline(self) def _push(self): - # TODO should work using regular pull/push + # TODO should work using regular diff return @@ -384,14 +384,12 @@ class CachedClientRoutes(ClientRoutes): _logger.debug('Check %r local cache to push', res) - for guid, patch in volume[res].diff(self._push_seq, layer='local'): + for guid, patch in volume[res].diff(self._push_seq): diff = {} diff_seq = toolkit.Sequence() post_requests = [] for prop, meta, seqno in patch: value = meta['value'] - if prop == 'layer': - value = list(set(value) - _LOCAL_LAYERS) diff[prop] = value diff_seq.include(seqno, seqno) if not diff: @@ -436,67 +434,6 @@ class _LocalRoutes(db.Routes, Router): db.Routes.__init__(self, volume) Router.__init__(self, self) - def on_create(self, request, props): - props['layer'] = tuple(props['layer']) + ('local',) - db.Routes.on_create(self, request, props) - - -class _NodeRoutes(SlaveRoutes, Router): - - def __init__(self, key_path, volume): - SlaveRoutes.__init__(self, key_path, volume) - Router.__init__(self, self) - - self.api_url = 'http://127.0.0.1:%s' % node.port.value - self._mounts = toolkit.Pool() - self._jobs = coroutine.Pool() - - mountpoints.connect(_SYNC_DIRNAME, - self.__found_mountcb, self.__lost_mount_cb) - - def close(self): - self.volume.close() - - def __repr__(self): - return '<LocalNode path=%s api_url=%s>' % \ - (self.volume.root, self.api_url) - - def _sync_mounts(self): - this.localcast({'event': 'sync_start'}) - - for mountpoint in self._mounts: - this.localcast({'event': 'sync_next', 'path': mountpoint}) - try: - self._offline_session = self._offline_sync( - join(mountpoint, _SYNC_DIRNAME), - **(self._offline_session or {})) - except Exception, error: - _logger.exception('Failed to complete synchronization') - this.localcast({'event': 'sync_abort', 'error': str(error)}) - self._offline_session = None - raise - - if self._offline_session is None: - _logger.debug('Synchronization completed') - this.localcast({'event': 'sync_complete'}) - else: - _logger.debug('Postpone synchronization with %r session', - self._offline_session) - this.localcast({'event': 'sync_paused'}) - - def __found_mountcb(self, path): - self._mounts.add(path) - if self._jobs: - _logger.debug('Found %r sync mount, pool it', path) - else: - _logger.debug('Found %r sync mount, start synchronization', path) - self._jobs.spawn(self._sync_mounts) - - def __lost_mount_cb(self, path): - if self._mounts.remove(path) == toolkit.Pool.ACTIVE: - _logger.warning('%r was unmounted, break synchronization', path) - self._jobs.kill() - class _ResponseStream(object): diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 9ebf907..7fe127d 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -20,8 +20,7 @@ from os.path import exists, join from sugar_network import toolkit from sugar_network.db.storage import Storage from sugar_network.db.metadata import Metadata, Guid -from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, exception, enforce +from sugar_network.toolkit import exception, enforce # To invalidate existed index on stcuture changes @@ -32,7 +31,7 @@ _logger = logging.getLogger('db.directory') class Directory(object): - def __init__(self, root, resource, index_class, seqno): + def __init__(self, root, resource, index_class, seqno, broadcast): """ :param index_class: what class to use to access to indexes, for regular casses @@ -52,6 +51,7 @@ class Directory(object): self._seqno = seqno self._storage = None self._index = None + self._broadcast = broadcast self._open() @@ -92,10 +92,10 @@ class Directory(object): guid = props['guid'] = toolkit.uuid() _logger.debug('Create %s[%s]: %r', self.metadata.name, guid, props) event = {'event': 'create', 'guid': guid} - self._index.store(guid, props, self._prestore, self._broadcast, event) + self._index.store(guid, props, self._prestore, self.broadcast, event) return guid - def update(self, guid, props): + def update(self, guid, props, event='update'): """Update properties for an existing document. :param guid: @@ -105,8 +105,10 @@ class Directory(object): """ _logger.debug('Update %s[%s]: %r', self.metadata.name, guid, props) - event = {'event': 'update', 'guid': guid} - self._index.store(guid, props, self._prestore, self._broadcast, event) + event = {'event': event, 'guid': guid} + if event['event'] == 'update': + event['props'] = props.copy() + self._index.store(guid, props, self._prestore, self.broadcast, event) def delete(self, guid): """Delete document. @@ -119,15 +121,9 @@ class Directory(object): event = {'event': 'delete', 'guid': guid} self._index.delete(guid, self._postdelete, guid, event) - def exists(self, guid): - return self._storage.get(guid).consistent - def get(self, guid): cached_props = self._index.get_cached(guid) record = self._storage.get(guid) - enforce(cached_props or record.exists, http.NotFound, - 'Resource %r does not exist in %r', - guid, self.metadata.name) return self.resource(guid, record, cached_props) def __getitem__(self, guid): @@ -202,10 +198,14 @@ class Directory(object): if doc.post_seqno is not None and doc.exists: # No need in after-merge event, further commit event # is enough to avoid increasing events flow - self._index.store(guid, doc.origs, self._preindex) + self._index.store(guid, doc.posts, self._preindex) return seqno + def broadcast(self, event): + event['resource'] = self.metadata.name + self._broadcast(event) + def _open(self): index_path = join(self._root, 'index', self.metadata.name) if self._is_layout_stale(): @@ -219,10 +219,6 @@ class Directory(object): self._storage = Storage(join(self._root, 'db', self.metadata.name)) _logger.debug('Open %r resource', self.resource) - def _broadcast(self, event): - event['resource'] = self.metadata.name - this.broadcast(event) - def _preindex(self, guid, changes): doc = self.resource(guid, self._storage.get(guid), changes) for prop in self.metadata: @@ -240,11 +236,11 @@ class Directory(object): def _postdelete(self, guid, event): self._storage.delete(guid) - self._broadcast(event) + self.broadcast(event) def _postcommit(self): self._seqno.commit() - self._broadcast({'event': 'commit', 'mtime': self._index.mtime}) + self.broadcast({'event': 'commit', 'mtime': self._index.mtime}) def _save_layout(self): path = join(self._root, 'index', self.metadata.name, 'layout') diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 31cace1..67a6d13 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -374,6 +374,9 @@ class Aggregated(Composite): def subtypecast(self, value): return self._subtype.typecast(value) + def subreprcast(self, value): + return self._subtype.reprcast(value) + def subteardown(self, value): self._subtype.teardown(value) diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index d17637d..38c1ce4 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -13,14 +13,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import time + from sugar_network.db.metadata import indexed_property, Localized -from sugar_network.db.metadata import Numeric, List, Authors +from sugar_network.db.metadata import Numeric, List, Authors, Enum from sugar_network.db.metadata import Composite, Aggregated -from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.router import ACL from sugar_network.toolkit import ranges +STATES = ['active', 'deleted'] +STATUSES = ['featured'] + + class Resource(object): """Base class for all data classes.""" @@ -32,10 +37,13 @@ class Resource(object): def __init__(self, guid, record, origs=None, posts=None): self.origs = origs or {} self.posts = posts or {} - self.guid = guid - self.is_new = not bool(guid) self.record = record self._post_seqno = None + self._guid = guid + + @property + def guid(self): + return self._guid or self['guid'] @property def post_seqno(self): @@ -64,33 +72,34 @@ class Resource(object): def author(self, value): return value - @indexed_property(List, prefix='RL', default=[]) - def layer(self, value): - return value - - @layer.setter - def layer(self, value): - orig = self.orig('layer') - if 'deleted' in value: - if this.request.method != 'POST' and 'deleted' not in orig: - self.deleted() - elif this.request.method != 'POST' and 'deleted' in orig: - self.restored() + @indexed_property(Enum, STATES, prefix='RE', default=STATES[0], acl=0) + def state(self, value): return value @indexed_property(List, prefix='RT', full_text=True, default=[]) def tags(self, value): return value + @indexed_property(List, prefix='RU', default=[], acl=ACL.READ, + subtype=Enum(STATUSES)) + def status(self, value): + return value + + @indexed_property(List, prefix='RP', default=[]) + def pins(self, value): + return value + @property def exists(self): return self.record is not None and self.record.consistent - def deleted(self): - pass + def created(self): + ts = int(time.time()) + self.posts['ctime'] = ts + self.posts['mtime'] = ts - def restored(self): - pass + def updated(self): + self.posts['mtime'] = int(time.time()) def get(self, prop, default=None): """Get document's property value. @@ -128,6 +137,19 @@ class Resource(object): self.origs[prop.name] = value return value + def repr(self, prop): + """Get property value with applying output typecasts. + + Such property values should be used to return property + out from the system. + + """ + prop_ = self.metadata[prop] + value = prop_.reprcast(self.get(prop)) + if prop_.on_get is not None: + value = prop_.on_get(self, value) + return value + def properties(self, props): result = {} for i in props: diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index e1f190c..f319658 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -14,7 +14,6 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import re -import time import logging from contextlib import contextmanager @@ -22,7 +21,7 @@ from sugar_network import toolkit from sugar_network.db.metadata import Aggregated from sugar_network.toolkit.router import ACL, File, route, fallbackroute from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit import http, parcel, enforce _GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$') @@ -40,20 +39,17 @@ class Routes(object): @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') def create(self, request): with self._post(request, ACL.CREATE) as doc: - self.on_create(request, doc.posts) + doc.created() + if request.principal: + authors = doc.posts['author'] = {} + self._useradd(authors, request.principal, ACL.ORIGINAL) self.volume[request.resource].create(doc.posts) - self.after_post(doc) return doc['guid'] @route('GET', [None], - arguments={ - 'offset': int, - 'limit': int, - 'layer': [], - 'reply': ('guid',), - }, + arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, mime_type='application/json') - def find(self, request, reply, limit, layer): + def find(self, request, reply, limit): self._preget(request) if self._find_limit: if limit <= 0: @@ -62,27 +58,22 @@ class Routes(object): _logger.warning('The find limit is restricted to %s', self._find_limit) request['limit'] = self._find_limit - if 'deleted' in layer: - _logger.warning('Requesting "deleted" layer, will ignore') - layer.remove('deleted') documents, total = self.volume[request.resource].find( - not_layer='deleted', **request) + not_state='deleted', **request) result = [self._postget(request, i, reply) for i in documents] return {'total': total, 'result': result} @route('GET', [None, None], cmd='exists', mime_type='application/json') def exists(self, request): - directory = self.volume[request.resource] - return directory.exists(request.guid) + return self.volume[request.resource][request.guid].exists @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def update(self, request): with self._post(request, ACL.WRITE) as doc: if not doc.posts: return - self.on_update(request, doc.posts) + doc.updated() self.volume[request.resource].update(doc.guid, doc.posts) - self.after_post(doc) @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR) def update_prop(self, request): @@ -97,8 +88,12 @@ class Routes(object): def delete(self, request): # Node data should not be deleted immediately # to make master-slave synchronization possible - request.content = {'layer': 'deleted'} - self.update(request) + directory = self.volume[request.resource] + doc = directory[request.guid] + enforce(doc.exists, http.NotFound, 'Resource not found') + doc.posts['state'] = 'deleted' + doc.updated() + directory.update(doc.guid, doc.posts, 'delete') @route('GET', [None, None], arguments={'reply': list}, mime_type='application/json') @@ -111,26 +106,16 @@ class Routes(object): reply.append(prop.name) self._preget(request) doc = self.volume[request.resource].get(request.guid) - enforce('deleted' not in doc['layer'], http.NotFound, 'Deleted') + enforce(doc.exists and doc['state'] != 'deleted', http.NotFound, + 'Resource not found') return self._postget(request, doc, reply) @route('GET', [None, None, None], mime_type='application/json') def get_prop(self, request, response): directory = self.volume[request.resource] - doc = directory.get(request.guid) - - prop = directory.metadata[request.prop] - prop.assert_access(ACL.READ) - - meta = doc.meta(prop.name) - if meta: - value = meta['value'] - response.last_modified = meta['mtime'] - else: - value = prop.default - value = _get_prop(doc, prop, value) + directory.metadata[request.prop].assert_access(ACL.READ) + value = directory[request.guid].repr(request.prop) enforce(value is not File.AWAY, http.NotFound, 'No blob') - return value @route('HEAD', [None, None, None]) @@ -152,6 +137,20 @@ class Routes(object): def remove_from_aggprop(self, request): self._aggpost(request, ACL.REMOVE, request.key) + @route('GET', [None, None, None, None], mime_type='application/json') + def get_aggprop(self): + doc = self.volume[this.request.resource][this.request.guid] + prop = doc.metadata[this.request.prop] + prop.assert_access(ACL.READ) + enforce(isinstance(prop, Aggregated), http.BadRequest, + 'Property is not aggregated') + agg_value = doc[prop.name].get(this.request.key) + enforce(agg_value is not None, http.NotFound, + 'Aggregated item not found') + value = prop.subreprcast(agg_value['value']) + enforce(value is not File.AWAY, http.NotFound, 'No blob') + return value + @route('PUT', [None, None], cmd='useradd', arguments={'role': 0}, acl=ACL.AUTH | ACL.AUTHOR) def useradd(self, request, user, role): @@ -171,60 +170,50 @@ class Routes(object): del authors[user] directory.update(request.guid, {'author': authors}) + @route('GET', [None, None], cmd='clone') + def clone(self, request): + clone = self.volume.clone(request.resource, request.guid) + return parcel.encode([('push', None, clone)]) + @fallbackroute('GET', ['blobs']) def blobs(self): return this.volume.blobs.get(this.request.guid) - def on_create(self, request, props): - ts = int(time.time()) - props['ctime'] = ts - props['mtime'] = ts - - if request.principal: - authors = props['author'] = {} - self._useradd(authors, request.principal, ACL.ORIGINAL) - - def on_update(self, request, props): - props['mtime'] = int(time.time()) - def on_aggprop_update(self, request, prop, value): pass - def after_post(self, doc): - pass - @contextmanager def _post(self, request, access): content = request.content enforce(isinstance(content, dict), http.BadRequest, 'Invalid value') - directory = self.volume[request.resource] if access == ACL.CREATE: - doc = directory.resource(None, None) if 'guid' in content: # TODO Temporal security hole, see TODO guid = content['guid'] - enforce(not directory.exists(guid), - http.BadRequest, '%s already exists', guid) enforce(_GUID_RE.match(guid) is not None, http.BadRequest, 'Malformed %s GUID', guid) else: - doc.posts['guid'] = toolkit.uuid() - for name, prop in directory.metadata.items(): + guid = toolkit.uuid() + doc = self.volume[request.resource][guid] + enforce(not doc.exists, 'Resource already exists') + doc.posts['guid'] = guid + for name, prop in doc.metadata.items(): if name not in content and prop.default is not None: doc.posts[name] = prop.default else: - doc = directory.get(request.guid) + doc = self.volume[request.resource][request.guid] + enforce(doc.exists, 'Resource not found') this.resource = doc def teardown(new, old): for name, value in new.items(): if old.get(name) != value: - directory.metadata[name].teardown(value) + doc.metadata[name].teardown(value) try: for name, value in content.items(): - prop = directory.metadata[name] + prop = doc.metadata[name] prop.assert_access(access, doc.orig(name)) if value is None: doc.posts[name] = prop.default @@ -255,8 +244,7 @@ class Routes(object): def _postget(self, request, doc, props): result = {} for name in props: - prop = doc.metadata[name] - value = _get_prop(doc, prop, doc.get(name)) + value = doc.repr(name) if isinstance(value, File): value = value.url result[name] = value @@ -264,10 +252,9 @@ class Routes(object): def _useradd(self, authors, user, role): props = {} - - users = self.volume['user'] - if users.exists(user): - props['name'] = users.get(user)['name'] + user_doc = self.volume['user'][user] + if user_doc.exists: + props['name'] = user_doc['name'] role |= ACL.INSYSTEM else: role &= ~ACL.INSYSTEM @@ -316,15 +303,8 @@ class Routes(object): authors = aggvalue['author'] = {} role = ACL.ORIGINAL if request.principal in doc['author'] else 0 self._useradd(authors, request.principal, role) - props = {request.prop: {aggid: aggvalue}} - self.on_update(request, props) - self.volume[request.resource].update(request.guid, props) + doc.posts[request.prop] = {aggid: aggvalue} + doc.updated() + self.volume[request.resource].update(request.guid, doc.posts) return aggid - - -def _get_prop(doc, prop, value): - value = prop.reprcast(value) - if prop.on_get is not None: - value = prop.on_get(doc, value) - return value diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 5d9bac1..7bf738c 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -19,9 +19,11 @@ from copy import deepcopy from os.path import exists, join, abspath from sugar_network import toolkit +from sugar_network.db.metadata import Blob from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter from sugar_network.db.blobs import Blobs +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, coroutine, ranges, enforce @@ -35,6 +37,7 @@ class Volume(dict): def __init__(self, root, documents, index_class=None): Volume._flush_pool.append(self) self.resources = {} + self.mute = False self._populators = coroutine.Pool() if index_class is None: @@ -122,6 +125,17 @@ class Volume(dict): ranges.exclude(r, None, last_seqno) yield {'commit': commit_r} + def clone(self, resource, guid): + doc = self[resource][guid] + patch = doc.diff([[1, None]]) + if not patch: + return + for name, prop in self[resource].metadata.items(): + if isinstance(prop, Blob) and name in patch: + yield self.blobs.get(patch[name]['value']) + yield {'resource': resource} + yield {'guid': guid, 'patch': patch} + def patch(self, records): directory = None committed = [] @@ -150,6 +164,13 @@ class Volume(dict): return seqno, committed + def broadcast(self, event): + if not self.mute: + if event['event'] == 'commit': + this.broadcast(event) + else: + this.localcast(event) + def __enter__(self): return self @@ -167,7 +188,8 @@ class Volume(dict): cls = getattr(mod, name.capitalize()) else: cls = resource - dir_ = Directory(self._root, cls, self._index_class, self.seqno) + dir_ = Directory(self._root, cls, self._index_class, self.seqno, + self.broadcast) self._populators.spawn(self._populate, dir_) self[name] = dir_ return dir_ diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index 9e1aaf5..4ff89ff 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -122,7 +122,6 @@ def generate_node_stats(volume): def load_bundle(blob, context=None, initial=False, extra_deps=None): - contexts = this.volume['context'] context_type = None context_meta = None release_notes = None @@ -186,18 +185,21 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): enforce(context, http.BadRequest, 'Context is not specified') enforce(version, http.BadRequest, 'Version is not specified') release['version'] = parse_version(version) - if initial and not contexts.exists(context): - enforce(context_meta, http.BadRequest, 'No way to initate context') - context_meta['guid'] = context - context_meta['type'] = [context_type] - this.call(method='POST', path=['context'], content=context_meta) + + doc = this.volume['context'][context] + if initial: + if not doc.exists: + enforce(context_meta, http.BadRequest, 'No way to initate context') + context_meta['guid'] = context + context_meta['type'] = [context_type] + this.call(method='POST', path=['context'], content=context_meta) else: - enforce(context_type in contexts[context]['type'], + enforce(doc.exists, http.NotFound, 'No context') + enforce(context_type in doc['type'], http.BadRequest, 'Inappropriate bundle type') - context_doc = contexts[context] if 'license' not in release: - releases = context_doc['releases'].values() + releases = doc['releases'].values() enforce(releases, http.BadRequest, 'License is not specified') recent = max(releases, key=lambda x: x.get('value', {}).get('release')) enforce(recent, http.BadRequest, 'License is not specified') @@ -205,11 +207,11 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): _logger.debug('Load %r release: %r', context, release) - if this.request.principal in context_doc['author']: - patch = context_doc.format_patch(context_meta) + if this.request.principal in doc['author']: + patch = doc.format_patch(context_meta) if patch: this.call(method='PUT', path=['context', context], content=patch) - context_doc.posts.update(patch) + doc.posts.update(patch) # TRANS: Release notes title title = i18n._('%(name)s %(version)s release') else: @@ -220,7 +222,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): 'context': context, 'type': 'notification', 'title': i18n.encode(title, - name=context_doc['title'], + name=doc['title'], version=version, ), 'message': release_notes or '', @@ -228,7 +230,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): content_type='application/json') blob['content-disposition'] = 'attachment; filename="%s-%s%s"' % ( - ''.join(i18n.decode(context_doc['title']).split()), + ''.join(i18n.decode(doc['title']).split()), version, mimetypes.guess_extension(blob.get('content-type')) or '', ) this.volume.blobs.update(blob.digest, blob) diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py index 3aceacc..5e12360 100644 --- a/sugar_network/model/context.py +++ b/sugar_network/model/context.py @@ -21,7 +21,11 @@ from sugar_network.toolkit import svg_to_png class Context(db.Resource): - @db.indexed_property(db.List, prefix='T', full_text=True, + @db.indexed_property(db.List, prefix='P', default=[]) + def pins(self, value): + return value + + @db.indexed_property(db.List, prefix='T', subtype=db.Enum(model.CONTEXT_TYPES)) def type(self, value): return value @@ -74,7 +78,7 @@ class Context(db.Resource): def homepage(self, value): return value - @db.indexed_property(db.List, prefix='Y', default=[], full_text=True) + @db.indexed_property(db.List, prefix='Y', default=[]) def mime_types(self, value): return value @@ -90,7 +94,8 @@ class Context(db.Resource): def logo(self, value): return value - @db.stored_property(db.Aggregated, subtype=db.Blob()) + @db.stored_property(db.Aggregated, subtype=db.Blob(), + acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.AUTHOR) def previews(self, value): return value @@ -99,12 +104,6 @@ class Context(db.Resource): def releases(self, value): return value - @releases.setter - def releases(self, value): - if value or this.request.method != 'POST': - self.invalidate_solutions() - return value - @db.indexed_property(db.Numeric, slot=2, default=0, acl=ACL.READ | ACL.CALC) def downloads(self, value): @@ -124,20 +123,19 @@ class Context(db.Resource): """ return value - @dependencies.setter - def dependencies(self, value): - if value or this.request.method != 'POST': - self.invalidate_solutions() - return value - - def deleted(self): - self.invalidate_solutions() + def created(self): + db.Resource.created(self) + self._invalidate_solutions() - def restored(self): - self.invalidate_solutions() + def updated(self): + db.Resource.updated(self) + self._invalidate_solutions() - def invalidate_solutions(self): - this.broadcast({ - 'event': 'release', - 'seqno': this.volume.releases_seqno.next(), - }) + def _invalidate_solutions(self): + if self['releases'] and \ + [i for i in ('state', 'releases', 'dependencies') + if i in self.posts and self.posts[i] != self.orig(i)]: + this.broadcast({ + 'event': 'release', + 'seqno': this.volume.releases_seqno.next(), + }) diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py index 21046f2..d924617 100644 --- a/sugar_network/model/post.py +++ b/sugar_network/model/post.py @@ -74,7 +74,8 @@ class Post(db.Resource): def preview(self, value): return value - @db.stored_property(db.Aggregated, subtype=db.Blob()) + @db.stored_property(db.Aggregated, subtype=db.Blob(), + acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.AUTHOR) def attachments(self, value): if value: value['name'] = self['title'] diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py index be9fd9f..a434a6d 100644 --- a/sugar_network/model/report.py +++ b/sugar_network/model/report.py @@ -60,6 +60,7 @@ class Report(db.Resource): def solution(self, value): return value - @db.stored_property(db.Aggregated, subtype=db.Blob()) + @db.stored_property(db.Aggregated, subtype=db.Blob(), + acl=ACL.READ | ACL.INSERT | ACL.AUTHOR) def logs(self, value): return value diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index af19023..fb409d4 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -28,6 +28,7 @@ class FrontRoutes(object): def __init__(self): self._spooler = coroutine.Spooler() this.broadcast = self._broadcast + this.localcast = self._broadcast @route('GET', mime_type='text/html') def hello(self): diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index 61d32fb..b93dcbc 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -17,6 +17,9 @@ import logging from urlparse import urlsplit from sugar_network import toolkit +from sugar_network.model.post import Post +from sugar_network.model.report import Report +from sugar_network.node.model import User, Context from sugar_network.node import obs, master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL @@ -24,12 +27,7 @@ from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, parcel, pylru, ranges, enforce -RESOURCES = ( - 'sugar_network.node.model', - 'sugar_network.model.post', - 'sugar_network.model.report', - 'sugar_network.model.user', - ) +RESOURCES = (User, Context, Post, Report) _logger = logging.getLogger('node.master') diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 8de6038..8f9819b 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -14,10 +14,12 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import bisect +import hashlib import logging from sugar_network import db -from sugar_network.model import Release, context as base_context +from sugar_network.model import Release, context as _context, user as _user + from sugar_network.node import obs from sugar_network.toolkit.router import ACL from sugar_network.toolkit.coroutine import this @@ -28,6 +30,13 @@ _logger = logging.getLogger('node.model') _presolve_queue = None +class User(_user.User): + + def created(self): + with file(this.volume.blobs.get(self['pubkey']).path) as f: + self.posts['guid'] = str(hashlib.sha1(f.read()).hexdigest()) + + class _Release(Release): _package_cast = db.Dict(db.List()) @@ -87,23 +96,17 @@ class _Release(Release): def teardown(self, value): if 'package' not in this.resource['type']: - return Release.typecast(self, value) + return Release.teardown(self, value) # TODO Delete presolved files -class Context(base_context.Context): +class Context(_context.Context): @db.stored_property(db.Aggregated, subtype=_Release(), acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE) def releases(self, value): return value - @releases.setter - def releases(self, value): - if value or this.request.method != 'POST': - self.invalidate_solutions() - return value - def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, stability=None, requires=None): @@ -151,6 +154,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, if context in context_clauses: return context_clauses[context] context = volume['context'][context] + enforce(context.exists, http.NotFound, 'Context not found') releases = context['releases'] clause = [] diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 5fdb27e..ea23297 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -121,9 +121,9 @@ class NodeRoutes(db.Routes, FrontRoutes): enforce(solution is not None, 'Failed to solve') return solution - @route('GET', ['context', None], cmd='clone', - arguments={'requires': list}) - def get_clone(self, request, response): + @route('GET', ['context', None], cmd='resolve', + arguments={'requires': list, 'stability': list}) + def resolve(self, request): solution = self.solve(request) return this.volume.blobs.get(solution[request.guid]['blob']) @@ -149,12 +149,6 @@ class NodeRoutes(db.Routes, FrontRoutes): enforce(self.authorize(request.principal, 'root'), http.Forbidden, 'Operation is permitted only for superusers') - def on_create(self, request, props): - if request.resource == 'user': - with file(this.volume.blobs.get(props['pubkey']).path) as f: - props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) - db.Routes.on_create(self, request, props) - def on_aggprop_update(self, request, prop, value): if prop.acl & ACL.AUTHOR: self._enforce_authority(request) @@ -164,8 +158,8 @@ class NodeRoutes(db.Routes, FrontRoutes): def authenticate(self, auth): enforce(auth.scheme == 'sugar', http.BadRequest, 'Unknown authentication scheme') - if not self.volume['user'].exists(auth.login): - raise Unauthorized('Principal does not exist', auth.nonce) + enforce(self.volume['user'][auth.login].exists, Unauthorized, + 'Principal does not exist') from M2Crypto import RSA diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 333e6ea..76593e9 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -22,6 +22,10 @@ from os.path import join, dirname, exists, isabs from gettext import gettext as _ from sugar_network import toolkit +from sugar_network.model.context import Context +from sugar_network.model.post import Post +from sugar_network.model.report import Report +from sugar_network.node.model import User from sugar_network.node import master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL @@ -29,6 +33,8 @@ from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, parcel, ranges, enforce +RESOURCES = (User, Context, Post, Report) + _logger = logging.getLogger('node.slave') @@ -46,7 +52,6 @@ class SlaveRoutes(NodeRoutes): @route('POST', cmd='online_sync', acl=ACL.LOCAL, arguments={'no_pull': bool}) def online_sync(self, no_pull=False): - self._export(not no_pull) conn = http.Connection(master_api.value) response = conn.request('POST', data=parcel.encode(self._export(not no_pull), header={ @@ -100,22 +105,22 @@ class SlaveRoutes(NodeRoutes): seqno, committed = this.volume.patch(packet) if seqno is not None: if from_master: - ranges.exclude(self._pull_r.value, committed) - self._pull_r.commit() + with self._pull_r as r: + ranges.exclude(r, committed) else: requests.append(('request', { 'origin': sender, 'ranges': committed, }, [])) - ranges.exclude(self._push_r.value, seqno, seqno) - self._push_r.commit() + with self._push_r as r: + ranges.exclude(r, seqno, seqno) elif packet.name == 'ack' and from_master and \ packet['to'] == self.guid: - ranges.exclude(self._pull_r.value, packet['ack']) - self._pull_r.commit() + with self._pull_r as r: + ranges.exclude(r, packet['ack']) if packet['ranges']: - ranges.exclude(self._push_r.value, packet['ranges']) - self._push_r.commit() + with self._push_r as r: + ranges.exclude(r, packet['ranges']) return requests diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py index b43c1e9..4a54975 100644 --- a/sugar_network/toolkit/coroutine.py +++ b/sugar_network/toolkit/coroutine.py @@ -370,8 +370,9 @@ def _print_exception(context, klass, value, tb): context = 'Undefined' elif not isinstance(context, basestring): if isinstance(context, dict) and 'PATH_INFO' in context: - context_repr = '%s%s' % \ - (context['PATH_INFO'], context.get('QUERY_STRING') or '') + context_repr = context['PATH_INFO'] + if 'QUERY_STRING' in context: + context_repr += '?' + context['QUERY_STRING'] try: context = self.format_context(context) except Exception: diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 9b9754e..9dd437e 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -202,6 +202,9 @@ class Connection(object): if not isinstance(path, basestring): path = '/'.join([i.strip('/') for i in [self.url] + path]) + # TODO Disable cookies on requests library level + self._session.cookies.clear() + try_ = 0 while True: try_ += 1 diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 8e23863..8eb84da 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -602,11 +602,11 @@ class Router(object): request.ensure_content() coroutine.spawn(self._event_stream, request, result) result = None + elif route_.mime_type and 'content-type' not in response: + response.set('content-type', route_.mime_type) except Exception, exception: + # To populate `exception` only raise - else: - if route_.mime_type and 'content-type' not in response: - response.set('content-type', route_.mime_type) finally: for i in self._postroutes: i(request, response, result, exception) @@ -689,7 +689,8 @@ class Router(object): elif not streamed_content: if response.content_type == 'application/json': content = json.dumps(content) - if 'content-length' not in response: + response.content_length = len(content) + elif 'content-length' not in response: response.content_length = len(content) if content else 0 if request.method == 'HEAD' and content is not None: _logger.warning('Content from HEAD response is ignored') @@ -753,21 +754,12 @@ class Router(object): commons['guid'] = request.guid if request.prop: commons['prop'] = request.prop - try: - for event in _event_stream(request, stream): - if 'event' not in event: - commons.update(event) - else: - event.update(commons) - this.localcast(event) - except Exception, error: - _logger.exception('Event stream %r failed', request) - event = {'event': 'failure', - 'exception': type(error).__name__, - 'error': str(error), - } - event.update(commons) - this.localcast(event) + for event in _event_stream(request, stream): + if 'event' not in event: + commons.update(event) + else: + event.update(commons) + this.localcast(event) def _assert_origin(self, environ): origin = environ['HTTP_ORIGIN'] @@ -837,6 +829,12 @@ def _event_stream(request, stream): event[0].update(i) event = event[0] yield event + except Exception, error: + _logger.exception('Event stream %r failed', request) + yield {'event': 'failure', + 'exception': type(error).__name__, + 'error': str(error), + } finally: _logger.debug('Event stream %r exited', request) |