diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-08 11:49:53 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-14 16:06:48 (GMT) |
commit | 71391e654f497234fac0a4602bba769820aa521c (patch) | |
tree | 2b5d6d66a4b23f28581adc4079a1aa28f3907407 /sugar_network | |
parent | 6ec16441c7c133c55385613f1e430c5ea37af632 (diff) |
More implementation polishing
* suppress passing guids while creating objects;
* access to request/response objects via "this";
* represent File objects as url strings;
* sepparate auth code;
* patch Logger.exception instead of using standalone function;
* move releases seqno to node.Volume.
Diffstat (limited to 'sugar_network')
30 files changed, 715 insertions, 704 deletions
diff --git a/sugar_network/client/auth.py b/sugar_network/client/auth.py new file mode 100644 index 0000000..db95aa5 --- /dev/null +++ b/sugar_network/client/auth.py @@ -0,0 +1,112 @@ +# Copyright (C) 2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import hashlib +from base64 import b64encode +from urllib2 import parse_http_list, parse_keqv_list +from os.path import abspath, expanduser, dirname, exists + + +class BasicCreds(object): + + def __init__(self, login, password): + self._login = login + self._password = password + + @property + def login(self): + return self._login + + @property + def profile(self): + return None + + def logon(self, challenge): + creds = '%s:%s' % (self._login, self._password) + return {'authorization': 'Basic ' + b64encode(creds)} + + +class SugarCreds(object): + + def __init__(self, key_path): + self._key_path = abspath(expanduser(key_path)) + self._key = None + self._pubkey = None + self._login = None + + @property + def pubkey(self): + if self._pubkey is None: + self.ensure_key() + from M2Crypto.BIO import MemoryBuffer + buf = MemoryBuffer() + self._key.save_pub_key_bio(buf) + self._pubkey = buf.getvalue() + return self._pubkey + + @property + def login(self): + if self._login is None: + self._login = str(hashlib.sha1(self.pubkey).hexdigest()) + return self._login + + @property + def profile(self): + try: + import gconf + gconf_ = gconf.client_get_default() + name = gconf_.get_string('/desktop/sugar/user/nick') + except Exception: + name = self.login + return {'name': name, 'pubkey': self.pubkey} + + def logon(self, challenge): + self.ensure_key() + challenge = challenge.split(' ', 1)[-1] + nonce = parse_keqv_list(parse_http_list(challenge)).get('nonce') + data = hashlib.sha1('%s:%s' % (self.login, nonce)).digest() + signature = self._key.sign(data).encode('hex') + authorization = 'Sugar username="%s",nonce="%s",signature="%s"' % \ + (self.login, nonce, signature) + return {'authorization': authorization} + + def ensure_key(self): + from M2Crypto import RSA + + key_dir = dirname(self._key_path) + if exists(self._key_path): + if os.stat(key_dir).st_mode & 077: + os.chmod(key_dir, 0700) + self._key = RSA.load_key(self._key_path) + return + + if not exists(key_dir): + os.makedirs(key_dir) + os.chmod(key_dir, 0700) + + _logger.info('Generate RSA private key at %r', self._key_path) + self._key = RSA.gen_key(1024, 65537, lambda *args: None) + self._key.save_key(self._key_path, cipher=None) + os.chmod(self._key_path, 0600) + + pub_key_path = self._key_path + '.pub' + with file(pub_key_path, 'w') as f: + f.write('ssh-rsa %s %s@%s' % ( + b64encode('\x00\x00\x00\x07ssh-rsa%s%s' % self._key.pub()), + self.login, + os.uname()[1], + )) + _logger.info('Saved RSA public key at %r', pub_key_path) diff --git a/sugar_network/client/injector.py b/sugar_network/client/injector.py index 6d0c420..69dc06a 100644 --- a/sugar_network/client/injector.py +++ b/sugar_network/client/injector.py @@ -487,7 +487,7 @@ def _exec(context, release, path, args, environ): os.execvpe(args[0], args, env) except BaseException: - logging.exception('Failed to execute %r args=%r', release, args) + _logger.exception('Failed to execute %r args=%r', release, args) finally: os._exit(1) diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py index 6a8f5ed..5a6f894 100644 --- a/sugar_network/client/journal.py +++ b/sugar_network/client/journal.py @@ -21,6 +21,7 @@ from tempfile import NamedTemporaryFile from sugar_network import client from sugar_network.toolkit.router import route, Request, File +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import enforce @@ -63,11 +64,12 @@ class Routes(object): 'reply': ('uid', 'title', 'description', 'preview'), 'order_by': list, }) - def journal_find(self, request, response): + def journal_find(self): enforce(self._ds is not None, 'Journal is inaccessible') import dbus + request = this.request reply = request.pop('reply') if 'preview' in reply: reply.remove('preview') @@ -95,8 +97,8 @@ class Routes(object): return {'result': result, 'total': int(total)} @route('GET', ['journal', None], mime_type='application/json') - def journal_get(self, request, response): - guid = request.guid + def journal_get(self): + guid = this.request.guid return {'guid': guid, 'title': get(guid, 'title'), 'description': get(guid, 'description'), @@ -104,49 +106,49 @@ class Routes(object): } @route('GET', ['journal', None, 'preview']) - def journal_get_preview(self, request, response): - return File(_prop_path(request.guid, 'preview'), meta={ + def journal_get_preview(self): + return File(_prop_path(this.request.guid, 'preview'), meta={ 'content-type': 'image/png', }) @route('GET', ['journal', None, 'data']) - def journal_get_data(self, request, response): - return File(_ds_path(request.guid, 'data'), meta={ - 'content-type': get(request.guid, 'mime_type') or + def journal_get_data(self): + return File(_ds_path(this.request.guid, 'data'), meta={ + 'content-type': get(this.request.guid, 'mime_type') or 'application/octet', }) @route('GET', ['journal', None, None], mime_type='application/json') - def journal_get_prop(self, request, response): - return get(request.guid, request.prop) + def journal_get_prop(self): + return get(this.request.guid, this.request.prop) @route('PUT', ['journal', None], cmd='share') - def journal_share(self, request, response): + def journal_share(self): enforce(self._ds is not None, 'Journal is inaccessible') - guid = request.guid + guid = this.request.guid preview_path = _prop_path(guid, 'preview') enforce(os.access(preview_path, os.R_OK), 'No preview') data_path = _ds_path(guid, 'data') enforce(os.access(data_path, os.R_OK), 'No data') subrequest = Request(method='POST', document='artifact') - subrequest.content = request.content + subrequest.content = this.request.content subrequest.content_type = 'application/json' # pylint: disable-msg=E1101 - subguid = self.fallback(subrequest, response) + subguid = self.fallback(subrequest) subrequest = Request(method='PUT', document='artifact', guid=subguid, prop='preview') subrequest.content_type = 'image/png' with file(preview_path, 'rb') as subrequest.content_stream: - self.fallback(subrequest, response) + self.fallback(subrequest) subrequest = Request(method='PUT', document='artifact', guid=subguid, prop='data') subrequest.content_type = get(guid, 'mime_type') or 'application/octet' with file(data_path, 'rb') as subrequest.content_stream: - self.fallback(subrequest, response) + self.fallback(subrequest) def journal_update(self, guid, data=None, **kwargs): enforce(self._ds is not None, 'Journal is inaccessible') diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py index 6207af2..70c8f46 100644 --- a/sugar_network/client/model.py +++ b/sugar_network/client/model.py @@ -19,18 +19,25 @@ from sugar_network import db from sugar_network.model.user import User from sugar_network.model.post import Post from sugar_network.model.report import Report -from sugar_network.model import context as base_context +from sugar_network.model.context import Context as _Context from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit.router import ACL _logger = logging.getLogger('client.model') -class Context(base_context.Context): +class Context(_Context): - @db.indexed_property(db.List, prefix='RP', default=[]) + @db.indexed_property(db.List, prefix='RP', default=[], + acl=ACL.READ | ACL.LOCAL) def pins(self, value): return value + this.injector.pins(self.guid) -RESOURCES = (User, Context, Post, Report) +class Volume(db.Volume): + + def __init__(self, root): + db.Volume.__init__(self, root, [User, Context, Post, Report]) + for resource in ('user', 'context', 'post'): + self[resource].metadata['author'].acl |= ACL.LOCAL diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index c4b645d..f580789 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -15,17 +15,16 @@ import os import logging -from base64 import b64encode from httplib import IncompleteRead from os.path import join from sugar_network import db, client, node, toolkit, model from sugar_network.client import journal from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit.router import ACL, Request, Response, Router +from sugar_network.toolkit.router import Request, Router, File from sugar_network.toolkit.router import route, fallbackroute from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel -from sugar_network.toolkit import lsb_release, exception, enforce +from sugar_network.toolkit import ranges, lsb_release, enforce # Flag file to recognize a directory as a synchronization directory @@ -37,20 +36,24 @@ _logger = logging.getLogger('client.routes') class ClientRoutes(model.FrontRoutes, journal.Routes): - def __init__(self, home_volume, no_subscription=False): + def __init__(self, home_volume, creds, no_subscription=False): model.FrontRoutes.__init__(self) journal.Routes.__init__(self) this.localcast = this.broadcast self._local = _LocalRoutes(home_volume) + self._creds = creds self._inline = coroutine.Event() self._inline_job = coroutine.Pool() self._remote_urls = [] self._node = None self._connect_jobs = coroutine.Pool() self._no_subscription = no_subscription - self._auth = _Auth() + self._push_r = toolkit.Bin( + join(home_volume.root, 'var', 'push'), + [[1, None]]) + self._push_job = coroutine.Pool() def connect(self, api=None): if self._connect_jobs: @@ -68,36 +71,36 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): self._local.volume.close() @fallbackroute('GET', ['hub']) - def hub(self, request, response): + def hub(self): """Serve Hub via HTTP instead of file:// for IPC users. Since SSE doesn't support CORS for now. """ - if request.environ['PATH_INFO'] == '/hub': + if this.request.environ['PATH_INFO'] == '/hub': raise http.Redirect('/hub/') - path = request.path[1:] + path = this.request.path[1:] if not path: path = ['index.html'] path = join(client.hub_root.value, *path) mtime = int(os.stat(path).st_mtime) - if request.if_modified_since >= mtime: + if this.request.if_modified_since >= mtime: raise http.NotModified() if path.endswith('.js'): - response.content_type = 'text/javascript' + this.response.content_type = 'text/javascript' if path.endswith('.css'): - response.content_type = 'text/css' - response.last_modified = mtime + this.response.content_type = 'text/css' + this.response.last_modified = mtime return file(path, 'rb') @fallbackroute('GET', ['packages']) - def route_packages(self, request, response): + def route_packages(self): if self._inline.is_set(): - return self.fallback(request, response) + return self.fallback() else: # Let caller know that we are in offline and # no way to process specified request on the node @@ -109,30 +112,31 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): return self._inline.is_set() @route('GET', cmd='whoami', mime_type='application/json') - def whoami(self, request, response): + def whoami(self): if self._inline.is_set(): - result = self.fallback(request, response) + result = self.fallback() result['route'] = 'proxy' else: result = {'roles': [], 'route': 'offline'} - result['guid'] = self._auth.login + result['guid'] = self._creds.login return result @route('GET', [None], arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, mime_type='application/json') - def find(self, request, response): + def find(self): + request = this.request if not self._inline.is_set() or 'pins' in request: - return self._local.call(request, response) + return self._local.call(request, this.response) reply = request.setdefault('reply', ['guid']) if 'pins' not in reply: - return self.fallback(request, response) + return self.fallback() if 'guid' not in reply: # Otherwise there is no way to mixin `pins` reply.append('guid') - result = self.fallback(request, response) + result = self.fallback() directory = self._local.volume[request.resource] for item in result['result']: @@ -143,18 +147,20 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): return result @route('GET', [None, None], mime_type='application/json') - def get(self, request, response): + def get(self): + request = this.request if self._local.volume[request.resource][request.guid].exists: - return self._local.call(request, response) + return self._local.call(request, this.response) else: - return self.fallback(request, response) + return self.fallback() @route('GET', [None, None, None], mime_type='application/json') - def get_prop(self, request, response): + def get_prop(self): + request = this.request if self._local.volume[request.resource][request.guid].exists: - return self._local.call(request, response) + return self._local.call(request, this.response) else: - return self.fallback(request, response) + return self.fallback() @route('POST', ['report'], cmd='submit', mime_type='text/event-stream') def submit_report(self): @@ -186,16 +192,16 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): yield event @route('DELETE', ['context', None], cmd='checkin') - def delete_checkin(self, request): + def delete_checkin(self): this.injector.checkout(this.request.guid) self._checkout_context() @route('PUT', ['context', None], cmd='favorite') - def put_favorite(self, request): + def put_favorite(self): self._checkin_context('favorite') @route('DELETE', ['context', None], cmd='favorite') - def delete_favorite(self, request): + def delete_favorite(self): self._checkout_context('favorite') @route('GET', cmd='recycle') @@ -205,14 +211,13 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): @fallbackroute() def fallback(self, request=None, response=None, **kwargs): if request is None: - request = Request(**kwargs) + request = Request(**kwargs) if kwargs else this.request if response is None: - response = Response() + response = this.response if not self._inline.is_set(): return self._local.call(request, response) - request.principal = self._auth.login try: reply = self._node.call(request, response) if hasattr(reply, 'read'): @@ -235,6 +240,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): self._local.volume.mute = True this.injector.api = url this.localcast({'event': 'inline', 'state': 'online'}) + self._push_job.spawn(self._push) def _got_offline(self): if self._node is not None: @@ -245,6 +251,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): self._local.volume.mute = False this.injector.api = None this.localcast({'event': 'inline', 'state': 'offline'}) + self._push_job.kill() def _restart_online(self): _logger.debug('Lost %r connection, try to reconnect in %s seconds', @@ -275,9 +282,8 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): def handshake(url): _logger.debug('Connecting to %r node', url) - self._node = client.Connection(url, auth=self._auth) + self._node = client.Connection(url, creds=self._creds) status = self._node.get(cmd='status') - self._auth.allow_basic_auth = (status.get('level') == 'master') seqno = status.get('seqno') if seqno and 'releases' in seqno: this.injector.seqno = seqno['releases'] @@ -302,7 +308,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): _logger.debug('Retry %r on gateway error', url) continue except Exception: - exception(_logger, 'Connection to %r failed', url) + _logger.exception('Connection to %r failed', url) break self._got_offline() if not timeout: @@ -323,7 +329,9 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): _logger.debug('Checkin %r context', context.guid) clone = self.fallback( method='GET', path=['context', context.guid], cmd='clone') - this.volume.patch(next(parcel.decode(clone))) + seqno, __ = this.volume.patch(next(parcel.decode(clone))) + if seqno: + ranges.exclude(self._push_r.value, seqno, seqno) pins = context['pins'] if pin and pin not in pins: this.volume['context'].update(context.guid, {'pins': pins + [pin]}) @@ -342,90 +350,52 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): else: directory.delete(context.guid) + def _push(self): + return + resource = None + metadata = None + + for diff in self._local.volume.diff(self._push_r.value, blobs=False): + if 'resource' in diff: + resource = diff['resource'] + metadata = self._local.volume[resource] + elif 'commit' in diff: + ranges.exclude(self._push_r.value, diff['commit']) + self._push_r.commit() + # No reasons to keep failure reports after pushing + self._local.volume['report'].wipe() + else: + props = {} + blobs = [] + for prop, meta in diff['patch'].items(): + if isinstance(metadata[prop], db.Blob): + blobs.application -class CachedClientRoutes(ClientRoutes): - def __init__(self, home_volume, api_url=None, no_subscription=False): - self._push_seq = toolkit.PersistentSequence( - join(home_volume.root, 'push.sequence'), [1, None]) - self._push_job = coroutine.Pool() - ClientRoutes.__init__(self, home_volume, api_url, no_subscription) - def _got_online(self, url): - ClientRoutes._got_online(self, url) - self._push_job.spawn(self._push) + props[prop] = meta['value'] - def _got_offline(self): - self._push_job.kill() - ClientRoutes._got_offline(self) - def _push(self): - # TODO should work using regular diff - return + if isinstance(diff, File): + with file(diff.path, 'rb') as f: + self.fallback(method='POST') - pushed_seq = toolkit.Sequence() - skiped_seq = toolkit.Sequence() - volume = self._local.volume - def push(request, seq): - try: - self.fallback(request) - except Exception: - _logger.exception('Cannot push %r, will postpone', request) - skiped_seq.include(seq) - else: - pushed_seq.include(seq) - - for res in volume.resources: - if volume.mtime(res) <= self._push_seq.mtime: - continue - - _logger.debug('Check %r local cache to push', res) - - for guid, patch in volume[res].diff(self._push_seq): - diff = {} - diff_seq = toolkit.Sequence() - post_requests = [] - for prop, meta, seqno in patch: - value = meta['value'] - diff[prop] = value - diff_seq.include(seqno, seqno) - if not diff: - continue - if 'guid' in diff: - request = Request(method='POST', path=[res]) - access = ACL.CREATE | ACL.WRITE - else: - request = Request(method='PUT', path=[res, guid]) - access = ACL.WRITE - for name in diff.keys(): - if not (volume[res].metadata[name].acl & access): - del diff[name] - request.content_type = 'application/json' - request.content = diff - push(request, diff_seq) - for request, seqno in post_requests: - push(request, [[seqno, seqno]]) - - if not pushed_seq: - if not self._push_seq.mtime: - self._push_seq.commit() - return - _logger.info('Pushed %r local cache', pushed_seq) - self._push_seq.exclude(pushed_seq) - if not skiped_seq: - self._push_seq.stretch() - if 'report' in volume: - # No any decent reasons to keep fail reports after uploding. - # TODO The entire offlile synchronization should be improved, - # for now, it is possible to have a race here - volume['report'].wipe() - self._push_seq.commit() + pass + + + if 'guid' in props: + request = Request(method='POST', path=[resource]) + else: + request = Request(method='PUT', path=[resource, guid]) + request.content_type = 'application/json' + request.content = props + self.fallback(request) class _LocalRoutes(db.Routes, Router): @@ -453,28 +423,3 @@ class _ResponseStream(object): except (http.ConnectionError, IncompleteRead): self._on_fail_cb() raise - - -class _Auth(http.SugarAuth): - - def __init__(self): - http.SugarAuth.__init__(self, client.keyfile.value) - if client.login.value: - self._login = client.login.value - self.allow_basic_auth = False - - def profile(self): - if self.allow_basic_auth and \ - client.login.value and client.password.value: - return None - import gconf - conf = gconf.client_get_default() - self._profile['name'] = conf.get_string('/desktop/sugar/user/nick') - return http.SugarAuth.profile(self) - - def __call__(self, nonce): - if not self.allow_basic_auth or \ - not client.login.value or not client.password.value: - return http.SugarAuth.__call__(self, nonce) - auth = b64encode('%s:%s' % (client.login.value, client.password.value)) - return 'Basic %s' % auth diff --git a/sugar_network/db/__init__.py b/sugar_network/db/__init__.py index b2ceb67..d6b12c5 100644 --- a/sugar_network/db/__init__.py +++ b/sugar_network/db/__init__.py @@ -235,8 +235,8 @@ The example code uses all mentioned above features:: return self.volume[document].create(item.properties(['prop1', 'prop2'])) @db.property_command(method='PUT', cmd='mutate') - def mutate(self, document, guid, prop, request): - self.volume[document].update(guid, {prop: request.content}) + def mutate(self, document, guid, prop): + self.volume[document].update(guid, {prop: this.request.content}) volume = db.Volume('db', [MyDocyment]) cp = MyCommands(volume) diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index cfbe517..54fd78a 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -190,8 +190,8 @@ class Blobs(object): break def patch(self, patch, seqno): - if 'path' in patch: - path = self.path(patch.pop('path')) + if 'path' in patch.meta: + path = self.path(patch.meta.pop('path')) else: path = self._blob_path(patch.digest) if not patch.size: @@ -202,9 +202,9 @@ class Blobs(object): os.rename(patch.path, path) if exists(path + _META_SUFFIX): meta = _read_meta(path) - meta.update(patch) + meta.update(patch.meta) else: - meta = patch + meta = patch.meta meta['x-seqno'] = str(seqno) _write_meta(path, meta, seqno) os.utime(path, (seqno, seqno)) diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 7fe127d..ecda920 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -20,7 +20,8 @@ from os.path import exists, join from sugar_network import toolkit from sugar_network.db.storage import Storage from sugar_network.db.metadata import Metadata, Guid -from sugar_network.toolkit import exception, enforce +from sugar_network.toolkit.router import ACL +from sugar_network.toolkit import enforce # To invalidate existed index on stcuture changes @@ -173,7 +174,7 @@ class Directory(object): self._index.store(guid, props) yield except Exception: - exception('Cannot populate %r in %r, invalidate it', + _logger.exception('Cannot populate %r in %r, invalidate it', guid, self.metadata.name) record.invalidate() @@ -227,8 +228,11 @@ class Directory(object): def _prestore(self, guid, changes, event): doc = self.resource(guid, self._storage.get(guid), posts=changes) - doc.post_seqno = self._seqno.next() + # It is important to iterate the `changes` by keys, + # values might be changed during iteration for prop in changes.keys(): + if not doc.post_seqno and not doc.metadata[prop].acl & ACL.LOCAL: + doc.post_seqno = self._seqno.next() doc.post(prop, changes[prop]) for prop in self.metadata.keys(): enforce(doc[prop] is not None, 'Empty %r property', prop) diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py index eb8f0cb..89ea6e8 100644 --- a/sugar_network/db/index.py +++ b/sugar_network/db/index.py @@ -23,7 +23,7 @@ from os.path import exists, join import xapian from sugar_network.db.metadata import GUID_PREFIX -from sugar_network.toolkit import Option, coroutine, exception, enforce +from sugar_network.toolkit import Option, coroutine, enforce index_flush_timeout = Option( @@ -398,7 +398,7 @@ class IndexWriter(IndexReader): self._db = xapian.WritableDatabase(self._path, xapian.DB_CREATE_OR_OPEN) except xapian.DatabaseError: - exception('Cannot open Xapian index in %r, will rebuild it', + _logger.exception('Cannot open Xapian %r index, will rebuild', self.metadata.name) shutil.rmtree(self._path, ignore_errors=True) self._db = xapian.WritableDatabase(self._path, diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 67a6d13..e820fc9 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -381,7 +381,10 @@ class Aggregated(Composite): self._subtype.teardown(value) def typecast(self, value): - return dict(value) + raise RuntimeError('Aggregated properties cannot be set directly') + + def reprcast(self, value): + return [(i, self.subreprcast(j['value'])) for i, j in value.items()] def encode(self, items): for agg in items.values(): diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 38c1ce4..9af5086 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -85,7 +85,7 @@ class Resource(object): def status(self, value): return value - @indexed_property(List, prefix='RP', default=[]) + @indexed_property(List, prefix='RP', default=[], acl=ACL.READ) def pins(self, value): return value @@ -163,7 +163,7 @@ class Resource(object): def diff(self, r): patch = {} for name, prop in self.metadata.items(): - if name == 'seqno' or prop.acl & ACL.CALC: + if name == 'seqno' or prop.acl & (ACL.CALC | ACL.LOCAL): continue meta = self.meta(name) if meta is None: @@ -203,15 +203,18 @@ class Resource(object): prop = self.metadata[prop] if prop.on_set is not None: value = prop.on_set(self, value) - if isinstance(prop, Aggregated): + seqno = None + if not prop.acl & ACL.LOCAL: + seqno = meta['seqno'] = self.post_seqno + if seqno and isinstance(prop, Aggregated): for agg in value.values(): - agg['seqno'] = self.post_seqno + agg['seqno'] = seqno if isinstance(prop, Composite): orig_value = self.orig(prop.name) if orig_value: orig_value.update(value) value = orig_value - self.record.set(prop.name, value=value, seqno=self.post_seqno, **meta) + self.record.set(prop.name, value=value, **meta) self.posts[prop.name] = value def __contains__(self, prop): diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index f319658..c74a93e 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -19,7 +19,7 @@ from contextlib import contextmanager from sugar_network import toolkit from sugar_network.db.metadata import Aggregated -from sugar_network.toolkit.router import ACL, File, route, fallbackroute +from sugar_network.toolkit.router import ACL, route, fallbackroute from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, parcel, enforce @@ -32,64 +32,62 @@ _logger = logging.getLogger('db.routes') class Routes(object): def __init__(self, volume, find_limit=None): - self.volume = volume + this.volume = self.volume = volume self._find_limit = find_limit - this.volume = self.volume @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') - def create(self, request): - with self._post(request, ACL.CREATE) as doc: + def create(self): + with self._post(ACL.CREATE) as doc: doc.created() - if request.principal: + if this.principal: authors = doc.posts['author'] = {} - self._useradd(authors, request.principal, ACL.ORIGINAL) - self.volume[request.resource].create(doc.posts) + self._useradd(authors, this.principal, ACL.ORIGINAL) + self.volume[this.request.resource].create(doc.posts) return doc['guid'] @route('GET', [None], arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, mime_type='application/json') - def find(self, request, reply, limit): - self._preget(request) - if self._find_limit: - if limit <= 0: - request['limit'] = self._find_limit - elif limit > self._find_limit: - _logger.warning('The find limit is restricted to %s', - self._find_limit) - request['limit'] = self._find_limit + def find(self, reply, limit): + self._preget() + request = this.request + if self._find_limit and limit > self._find_limit: + _logger.warning('The find limit is restricted to %s', + self._find_limit) + request['limit'] = self._find_limit documents, total = self.volume[request.resource].find( not_state='deleted', **request) - result = [self._postget(request, i, reply) for i in documents] + result = [self._postget(i, reply) for i in documents] return {'total': total, 'result': result} @route('GET', [None, None], cmd='exists', mime_type='application/json') - def exists(self, request): - return self.volume[request.resource][request.guid].exists + def exists(self): + return self.volume[this.request.resource][this.request.guid].exists @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR) - def update(self, request): - with self._post(request, ACL.WRITE) as doc: + def update(self): + with self._post(ACL.WRITE) as doc: if not doc.posts: return doc.updated() - self.volume[request.resource].update(doc.guid, doc.posts) + self.volume[this.request.resource].update(doc.guid, doc.posts) @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR) - def update_prop(self, request): + def update_prop(self): + request = this.request if request.content is None: value = request.content_stream else: value = request.content request.content = {request.prop: value} - self.update(request) + self.update() @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) - def delete(self, request): + def delete(self): # Node data should not be deleted immediately # to make master-slave synchronization possible - directory = self.volume[request.resource] - doc = directory[request.guid] + directory = self.volume[this.request.resource] + doc = directory[this.request.guid] enforce(doc.exists, http.NotFound, 'Resource not found') doc.posts['state'] = 'deleted' doc.updated() @@ -97,45 +95,43 @@ class Routes(object): @route('GET', [None, None], arguments={'reply': list}, mime_type='application/json') - def get(self, request, reply): + def get(self, reply): if not reply: reply = [] - for prop in self.volume[request.resource].metadata.values(): - if prop.acl & ACL.READ and not (prop.acl & ACL.LOCAL) and \ - not isinstance(prop, Aggregated): + for prop in self.volume[this.request.resource].metadata.values(): + if prop.acl & ACL.READ and not isinstance(prop, Aggregated): reply.append(prop.name) - self._preget(request) - doc = self.volume[request.resource].get(request.guid) + self._preget() + doc = self.volume[this.request.resource].get(this.request.guid) enforce(doc.exists and doc['state'] != 'deleted', http.NotFound, 'Resource not found') - return self._postget(request, doc, reply) + return self._postget(doc, reply) @route('GET', [None, None, None], mime_type='application/json') - def get_prop(self, request, response): + def get_prop(self): + request = this.request directory = self.volume[request.resource] directory.metadata[request.prop].assert_access(ACL.READ) - value = directory[request.guid].repr(request.prop) - enforce(value is not File.AWAY, http.NotFound, 'No blob') - return value + return directory[request.guid].repr(request.prop) @route('HEAD', [None, None, None]) - def get_prop_meta(self, request, response): - return self.get_prop(request, response) + def get_prop_meta(self): + return self.get_prop() @route('POST', [None, None, None], acl=ACL.AUTH, mime_type='application/json') - def insert_to_aggprop(self, request): - return self._aggpost(request, ACL.INSERT) + def insert_to_aggprop(self): + return self._aggpost(ACL.INSERT) @route('PUT', [None, None, None, None], acl=ACL.AUTH, mime_type='application/json') - def update_aggprop(self, request): - self._aggpost(request, ACL.REPLACE, request.key) + def update_aggprop(self): + self._aggpost(ACL.REPLACE) @route('DELETE', [None, None, None, None], acl=ACL.AUTH, mime_type='application/json') - def remove_from_aggprop(self, request): - self._aggpost(request, ACL.REMOVE, request.key) + def remove_from_aggprop(self): + self._aggpost(ACL.REMOVE) @route('GET', [None, None, None, None], mime_type='application/json') def get_aggprop(self): @@ -147,13 +143,12 @@ class Routes(object): agg_value = doc[prop.name].get(this.request.key) enforce(agg_value is not None, http.NotFound, 'Aggregated item not found') - value = prop.subreprcast(agg_value['value']) - enforce(value is not File.AWAY, http.NotFound, 'No blob') - return value + return prop.subreprcast(agg_value['value']) @route('PUT', [None, None], cmd='useradd', arguments={'role': 0}, acl=ACL.AUTH | ACL.AUTHOR) - def useradd(self, request, user, role): + def useradd(self, user, role): + request = this.request enforce(user, "Argument 'user' is not specified") directory = self.volume[request.resource] authors = directory.get(request.guid)['author'] @@ -161,9 +156,10 @@ class Routes(object): directory.update(request.guid, {'author': authors}) @route('PUT', [None, None], cmd='userdel', acl=ACL.AUTH | ACL.AUTHOR) - def userdel(self, request, user): + def userdel(self, user): + request = this.request enforce(user, "Argument 'user' is not specified") - enforce(user != request.principal, 'Cannot remove yourself') + enforce(user != this.principal, 'Cannot remove yourself') directory = self.volume[request.resource] authors = directory.get(request.guid)['author'] enforce(user in authors, 'No such user') @@ -171,38 +167,36 @@ class Routes(object): directory.update(request.guid, {'author': authors}) @route('GET', [None, None], cmd='clone') - def clone(self, request): - clone = self.volume.clone(request.resource, request.guid) + def clone(self): + clone = self.volume.clone(this.request.resource, this.request.guid) return parcel.encode([('push', None, clone)]) @fallbackroute('GET', ['blobs']) def blobs(self): - return this.volume.blobs.get(this.request.guid) - - def on_aggprop_update(self, request, prop, value): - pass + return self.volume.blobs.get(this.request.guid) @contextmanager - def _post(self, request, access): - content = request.content + def _post(self, access): + content = this.request.content enforce(isinstance(content, dict), http.BadRequest, 'Invalid value') if access == ACL.CREATE: - if 'guid' in content: - # TODO Temporal security hole, see TODO - guid = content['guid'] + guid = content.get('guid') + if guid: + enforce(this.principal and this.principal.admin, + http.BadRequest, 'GUID should not be specified') enforce(_GUID_RE.match(guid) is not None, - http.BadRequest, 'Malformed %s GUID', guid) + http.BadRequest, 'Malformed GUID') else: guid = toolkit.uuid() - doc = self.volume[request.resource][guid] + doc = self.volume[this.request.resource][guid] enforce(not doc.exists, 'Resource already exists') doc.posts['guid'] = guid for name, prop in doc.metadata.items(): if name not in content and prop.default is not None: doc.posts[name] = prop.default else: - doc = self.volume[request.resource][request.guid] + doc = self.volume[this.request.resource][this.request.guid] enforce(doc.exists, 'Resource not found') this.resource = doc @@ -223,7 +217,7 @@ class Routes(object): except Exception, error: error = 'Value %r for %r property is invalid: %s' % \ (value, prop.name, error) - toolkit.exception(error) + _logger.exception(error) raise http.BadRequest(error) yield doc except Exception: @@ -232,22 +226,19 @@ class Routes(object): else: teardown(doc.origs, doc.posts) - def _preget(self, request): - reply = request.get('reply') + def _preget(self): + reply = this.request.get('reply') if not reply: - request['reply'] = ('guid',) + this.request['reply'] = ('guid',) else: - directory = self.volume[request.resource] + directory = self.volume[this.request.resource] for prop in reply: directory.metadata[prop].assert_access(ACL.READ) - def _postget(self, request, doc, props): + def _postget(self, doc, props): result = {} for name in props: - value = doc.repr(name) - if isinstance(value, File): - value = value.url - result[name] = value + result[name] = doc.repr(name) return result def _useradd(self, authors, user, role): @@ -270,20 +261,29 @@ class Routes(object): props['order'] = 0 authors[user] = props - def _aggpost(self, request, acl, aggid=None): + def _aggpost(self, acl): + request = this.request doc = this.resource = self.volume[request.resource][request.guid] prop = doc.metadata[request.prop] enforce(isinstance(prop, Aggregated), http.BadRequest, 'Property is not aggregated') prop.assert_access(acl) + def enforce_authority(author): + if prop.acl & ACL.AUTHOR: + author = doc['author'] + enforce(not author or this.principal in author or + this.principal and this.principal.admin, + http.Forbidden, 'Authors only') + + aggid = request.key if aggid and aggid in doc[request.prop]: aggvalue = doc[request.prop][aggid] - self.on_aggprop_update(request, prop, aggvalue) + enforce_authority(aggvalue.get('author')) prop.subteardown(aggvalue['value']) else: enforce(acl != ACL.REMOVE, http.NotFound, 'No aggregated item') - self.on_aggprop_update(request, prop, None) + enforce_authority(None) aggvalue = {} if acl != ACL.REMOVE: @@ -299,10 +299,10 @@ class Routes(object): aggid = toolkit.uuid() aggvalue['value'] = value - if request.principal: + if this.principal: authors = aggvalue['author'] = {} - role = ACL.ORIGINAL if request.principal in doc['author'] else 0 - self._useradd(authors, request.principal, role) + role = ACL.ORIGINAL if this.principal in doc['author'] else 0 + self._useradd(authors, this.principal, role) doc.posts[request.prop] = {aggid: aggvalue} doc.updated() self.volume[request.resource].update(request.guid, doc.posts) diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 7bf738c..295fc02 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -23,6 +23,7 @@ from sugar_network.db.metadata import Blob from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter from sugar_network.db.blobs import Blobs +from sugar_network.toolkit.router import File from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, coroutine, ranges, enforce @@ -34,7 +35,7 @@ class Volume(dict): _flush_pool = [] - def __init__(self, root, documents, index_class=None): + def __init__(self, root, resources, index_class=None): Volume._flush_pool.append(self) self.resources = {} self.mute = False @@ -49,12 +50,10 @@ class Volume(dict): if not exists(root): os.makedirs(root) self._index_class = index_class - self.seqno = toolkit.Seqno(join(self._root, 'var', 'db.seqno')) - self.releases_seqno = toolkit.Seqno( - join(self._root, 'var', 'releases.seqno')) + self.seqno = toolkit.Seqno(join(self._root, 'var', 'seqno')) self.blobs = Blobs(root, self.seqno) - for document in documents: + for document in resources: if isinstance(document, basestring): name = document.split('.')[-1] else: @@ -72,14 +71,13 @@ class Volume(dict): while self: __, cls = self.popitem() cls.close() - self.releases_seqno.commit() def populate(self): for cls in self.values(): for __ in cls.populate(): coroutine.dispatch() - def diff(self, r, exclude=None, files=None, one_way=False): + def diff(self, r, exclude=None, files=None, blobs=True, one_way=False): if exclude: include = deepcopy(r) ranges.exclude(include, exclude) @@ -105,14 +103,15 @@ class Volume(dict): yield {'guid': doc.guid, 'patch': patch} found = True last_seqno = max(last_seqno, doc['seqno']) - for blob in self.blobs.diff(include): - seqno = int(blob.pop('x-seqno')) - yield blob - found = True - last_seqno = max(last_seqno, seqno) + if blobs: + for blob in self.blobs.diff(include): + seqno = int(blob.meta.pop('x-seqno')) + yield blob + found = True + last_seqno = max(last_seqno, seqno) for dirpath in files or []: for blob in self.blobs.diff(include, dirpath): - seqno = int(blob.pop('x-seqno')) + seqno = int(blob.meta.pop('x-seqno')) yield blob found = True last_seqno = max(last_seqno, seqno) @@ -142,25 +141,24 @@ class Volume(dict): seqno = None for record in records: - resource_ = record.get('resource') - if resource_: - directory = self[resource_] - continue - - if 'guid' in record: - seqno = directory.patch(record['guid'], record['patch'], seqno) - continue - - if 'content-length' in record: + if isinstance(record, File): if seqno is None: seqno = self.seqno.next() self.blobs.patch(record, seqno) continue - + resource = record.get('resource') + if resource: + directory = self[resource] + continue + guid = record.get('guid') + if guid is not None: + seqno = directory.patch(guid, record['patch'], seqno) + continue commit = record.get('commit') if commit is not None: ranges.include(committed, commit) continue + raise http.BadRequest('Malformed patch') return seqno, committed diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index 4ff89ff..c6b3321 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -28,7 +28,7 @@ from sugar_network.toolkit.spec import EMPTY_LICENSE from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit.router import ACL -from sugar_network.toolkit import i18n, http, svg_to_png, exception, enforce +from sugar_network.toolkit import i18n, http, svg_to_png, enforce CONTEXT_TYPES = [ @@ -87,6 +87,9 @@ class Release(object): context=this.request.guid) return release['bundles']['*-*']['blob'], release + def reprcast(self, release): + return this.volume.blobs.get(release['bundles']['*-*']['blob']) + def teardown(self, release): if this.resource.exists and \ 'activity' not in this.resource['type'] and \ @@ -180,19 +183,21 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): 'unpack_size': unpack_size, }, } - blob['content-type'] = 'application/vnd.olpc-sugar' + blob.meta['content-type'] = 'application/vnd.olpc-sugar' enforce(context, http.BadRequest, 'Context is not specified') enforce(version, http.BadRequest, 'Version is not specified') release['version'] = parse_version(version) doc = this.volume['context'][context] - if initial: - if not doc.exists: - enforce(context_meta, http.BadRequest, 'No way to initate context') - context_meta['guid'] = context - context_meta['type'] = [context_type] - this.call(method='POST', path=['context'], content=context_meta) + if initial and not doc.exists: + enforce(context_meta, http.BadRequest, 'No way to initate context') + context_meta['guid'] = context + context_meta['type'] = [context_type] + with this.principal as principal: + principal.admin = True + this.call(method='POST', path=['context'], content=context_meta, + principal=principal) else: enforce(doc.exists, http.NotFound, 'No context') enforce(context_type in doc['type'], @@ -207,10 +212,11 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): _logger.debug('Load %r release: %r', context, release) - if this.request.principal in doc['author']: + if this.principal in doc['author']: patch = doc.format_patch(context_meta) if patch: - this.call(method='PUT', path=['context', context], content=patch) + this.call(method='PUT', path=['context', context], content=patch, + principal=this.principal) doc.posts.update(patch) # TRANS: Release notes title title = i18n._('%(name)s %(version)s release') @@ -227,13 +233,13 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): ), 'message': release_notes or '', }, - content_type='application/json') + content_type='application/json', principal=this.principal) - blob['content-disposition'] = 'attachment; filename="%s-%s%s"' % ( - ''.join(i18n.decode(doc['title']).split()), - version, mimetypes.guess_extension(blob.get('content-type')) or '', + blob.meta['content-disposition'] = 'attachment; filename="%s-%s%s"' % ( + ''.join(i18n.decode(doc['title']).split()), version, + mimetypes.guess_extension(blob.meta.get('content-type')) or '', ) - this.volume.blobs.update(blob.digest, blob) + this.volume.blobs.update(blob.digest, blob.meta) return context, release @@ -261,7 +267,7 @@ def _load_context_metadata(bundle, spec): icon_file.close() except Exception: - exception(_logger, 'Failed to load icon') + _logger.exception('Failed to load icon') msgids = {} for prop, confname in [ @@ -289,6 +295,6 @@ def _load_context_metadata(bundle, spec): if lang == 'en' or msgstr != value: result[prop][lang] = msgstr except Exception: - exception(_logger, 'Gettext failed to read %r', mo_path[-1]) + _logger.exception('Gettext failed to read %r', mo_path[-1]) return result diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py index 5e12360..78df790 100644 --- a/sugar_network/model/context.py +++ b/sugar_network/model/context.py @@ -113,7 +113,7 @@ class Context(db.Resource): def rating(self, value): return value - @db.stored_property(default='', acl=ACL.PUBLIC | ACL.LOCAL) + @db.stored_property(default='', acl=ACL.PUBLIC) def dependencies(self, value): """Software dependencies. @@ -122,20 +122,3 @@ class Context(db.Resource): """ return value - - def created(self): - db.Resource.created(self) - self._invalidate_solutions() - - def updated(self): - db.Resource.updated(self) - self._invalidate_solutions() - - def _invalidate_solutions(self): - if self['releases'] and \ - [i for i in ('state', 'releases', 'dependencies') - if i in self.posts and self.posts[i] != self.orig(i)]: - this.broadcast({ - 'event': 'release', - 'seqno': this.volume.releases_seqno.next(), - }) diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index fb409d4..eda26dc 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -35,50 +35,32 @@ class FrontRoutes(object): return _HELLO_HTML @route('OPTIONS') - def options(self, request, response): - if request.environ['HTTP_ORIGIN']: + def options(self): + response = this.response + environ = this.request.environ + if environ['HTTP_ORIGIN']: response['Access-Control-Allow-Methods'] = \ - request.environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD'] + environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD'] response['Access-Control-Allow-Headers'] = \ - request.environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS'] + environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS'] else: response['Allow'] = 'GET, HEAD, POST, PUT, DELETE' response.content_length = 0 @route('GET', cmd='subscribe', mime_type='text/event-stream') - def subscribe(self, request=None, response=None, **condition): + def subscribe(self, **condition): """Subscribe to Server-Sent Events.""" - if request is not None and not condition: - condition = request - if response is not None: - response.content_type = 'text/event-stream' - response['Cache-Control'] = 'no-cache' - return self._pull_events(request, condition) + this.response['Cache-Control'] = 'no-cache' - @route('GET', ['robots.txt'], mime_type='text/plain') - def robots(self, request, response): - return 'User-agent: *\nDisallow: /\n' - - @route('GET', ['favicon.ico']) - def favicon(self): - return this.volume.blobs.get('favicon.ico') - - def _broadcast(self, event): - _logger.debug('Broadcast event: %r', event) - self._spooler.notify_all(event) - - def _pull_events(self, request, condition): _logger.debug('Start %s-nth subscription', self._spooler.waiters + 1) # Unblock `GET /?cmd=subscribe` call to let non-greenlet application # initiate a subscription and do not stuck in waiting for the 1st event yield {'event': 'pong'} - subscription = None - if request is not None: - subscription = request.content_stream - if subscription is not None: - coroutine.spawn(self._wait_for_closing, subscription) + subscription = this.request.content_stream + if subscription is not None: + coroutine.spawn(self._wait_for_closing, subscription) while True: event = self._spooler.wait() @@ -98,6 +80,18 @@ class FrontRoutes(object): _logger.debug('Stop %s-nth subscription', self._spooler.waiters) + @route('GET', ['robots.txt'], mime_type='text/plain') + def robots(self): + return 'User-agent: *\nDisallow: /\n' + + @route('GET', ['favicon.ico']) + def favicon(self): + return this.volume.blobs.get('favicon.ico') + + def _broadcast(self, event): + _logger.debug('Broadcast event: %r', event) + self._spooler.notify_all(event) + def _wait_for_closing(self, rfile): try: coroutine.select([rfile.fileno()], [], []) diff --git a/sugar_network/model/user.py b/sugar_network/model/user.py index b44093e..41f48a0 100644 --- a/sugar_network/model/user.py +++ b/sugar_network/model/user.py @@ -31,6 +31,6 @@ class User(db.Resource): def birthday(self, value): return value - @db.stored_property(db.Blob, acl=ACL.CREATE, mime_type='text/plain') + @db.stored_property(acl=ACL.READ | ACL.CREATE) def pubkey(self, value): return value diff --git a/sugar_network/node/auth.py b/sugar_network/node/auth.py new file mode 100644 index 0000000..27b334c --- /dev/null +++ b/sugar_network/node/auth.py @@ -0,0 +1,118 @@ +# Copyright (C) 2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import time +import hashlib +import logging +from ConfigParser import ConfigParser +from os.path import join, dirname, exists, expanduser, abspath + +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import pylru, http, enforce + + +_SIGNATURE_LIFETIME = 600 +_AUTH_POOL_SIZE = 1024 + +_logger = logging.getLogger('node.auth') + + +class Unauthorized(http.Unauthorized): + + def __init__(self, message, nonce=None): + http.Unauthorized.__init__(self, message) + if not nonce: + nonce = int(time.time()) + _SIGNATURE_LIFETIME + self.headers = {'www-authenticate': 'Sugar nonce="%s"' % nonce} + + +class Principal(str): + + admin = False + editor = False + translator = False + + _backup = None + + def __enter__(self): + self._backup = (self.admin, self.editor, self.translator) + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.admin, self.editor, self.translator = self._backup + self._backup = None + + +class SugarAuth(object): + + def __init__(self, root): + self._config_path = join(root, 'etc', 'authorization.conf') + self._pool = pylru.lrucache(_AUTH_POOL_SIZE) + self._config = None + + def reload(self): + self._config = ConfigParser() + if exists(self._config_path): + self._config.read(self._config_path) + self._pool.clear() + + def logon(self, request): + auth = request.environ.get('HTTP_AUTHORIZATION') + enforce(auth, Unauthorized, 'No credentials') + + if self._config is None: + self.reload() + + from M2Crypto import RSA, BIO + from urllib2 import parse_http_list, parse_keqv_list + + if auth in self._pool: + login, nonce = self._pool[auth] + else: + scheme, creds = auth.strip().split(' ', 1) + enforce(scheme.lower() == 'sugar', http.BadRequest, + 'Unsupported authentication scheme') + creds = parse_keqv_list(parse_http_list(creds)) + login = creds['username'] + signature = creds['signature'] + nonce = int(creds['nonce']) + user = this.volume['user'][login] + enforce(user.exists, Unauthorized, 'Principal does not exist') + key = RSA.load_pub_key_bio(BIO.MemoryBuffer(str(user['pubkey']))) + data = hashlib.sha1('%s:%s' % (login, nonce)).digest() + enforce(key.verify(data, signature.decode('hex')), + http.Forbidden, 'Bad credentials') + self._pool[auth] = (login, nonce) + + enforce(abs(time.time() - nonce) <= _SIGNATURE_LIFETIME, + Unauthorized, 'Credentials expired') + principal = Principal(login) + + user = principal + if not self._config.has_option('permissions', user): + user = 'default' + if not self._config.has_option('permissions', user): + user = None + if user: + for role in self._config.get('permissions', user).split(): + role = role.lower() + if role == 'admin': + principal.admin = True + elif role == 'editor': + principal.editor = True + elif role == 'translator': + principal.translator = True + + return principal diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index b93dcbc..c5b15e6 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -58,8 +58,8 @@ class MasterRoutes(NodeRoutes): @route('PUT', ['context', None], cmd='presolve', acl=ACL.AUTH, mime_type='application/json') - def presolve(self, request): - aliases = this.volume['context'].get(request.guid)['aliases'] + def presolve(self): + aliases = this.volume['context'].get(this.request.guid)['aliases'] enforce(aliases, http.BadRequest, 'Nothing to presolve') return obs.presolve(None, aliases, this.volume.blobs.path('packages')) diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 8f9819b..b1cb401 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -16,10 +16,10 @@ import bisect import hashlib import logging +from os.path import join -from sugar_network import db +from sugar_network import db, toolkit from sugar_network.model import Release, context as _context, user as _user - from sugar_network.node import obs from sugar_network.toolkit.router import ACL from sugar_network.toolkit.coroutine import this @@ -33,8 +33,7 @@ _presolve_queue = None class User(_user.User): def created(self): - with file(this.volume.blobs.get(self['pubkey']).path) as f: - self.posts['guid'] = str(hashlib.sha1(f.read()).hexdigest()) + self.posts['guid'] = str(hashlib.sha1(self['pubkey']).hexdigest()) class _Release(Release): @@ -107,6 +106,34 @@ class Context(_context.Context): def releases(self, value): return value + def created(self): + _context.Context.created(self) + self._invalidate_solutions() + + def updated(self): + _context.Context.updated(self) + self._invalidate_solutions() + + def _invalidate_solutions(self): + if self['releases'] and \ + [i for i in ('state', 'releases', 'dependencies') + if i in self.posts and self.posts[i] != self.orig(i)]: + this.broadcast({ + 'event': 'release', + 'seqno': this.volume.release_seqno.next(), + }) + + +class Volume(db.Volume): + + def __init__(self, root, resources, **kwargs): + db.Volume.__init__(self, root, resources, **kwargs) + self.release_seqno = toolkit.Seqno(join(root, 'var', 'seqno-release')) + + def close(self): + db.Volume.close(self) + self.release_seqno.commit() + def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, stability=None, requires=None): @@ -199,7 +226,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, blob = volume.blobs.get(digest) if blob is not None: release_info['size'] = blob.size - release_info['content-type'] = blob['content-type'] + release_info['content-type'] = blob.meta['content-type'] unpack_size = release['bundles']['*-*'].get('unpack_size') if unpack_size is not None: release_info['unpack_size'] = unpack_size diff --git a/sugar_network/node/obs.py b/sugar_network/node/obs.py index 796ea7c..0c68a6e 100644 --- a/sugar_network/node/obs.py +++ b/sugar_network/node/obs.py @@ -84,7 +84,7 @@ def presolve(repo_name, packages, dst_path): to_download.append((url, path)) files.setdefault(arch, []).append(binary) except Exception: - toolkit.exception(_logger, 'Failed to presolve %r on %s', + _logger.exception('Failed to presolve %r on %s', packages, repo['name']) continue diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index ea23297..4457b2f 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -13,51 +13,70 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import time import logging -import hashlib -from ConfigParser import ConfigParser -from os.path import join, exists +from os.path import join -from sugar_network import db, node +from sugar_network import db from sugar_network.model import FrontRoutes, load_bundle from sugar_network.node import model # pylint: disable-msg=W0611 -from sugar_network.toolkit.router import route, preroute, postroute, ACL, File -from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute +from sugar_network.toolkit.router import route, postroute, ACL, File +from sugar_network.toolkit.router import Request, fallbackroute, preroute from sugar_network.toolkit.spec import parse_requires, parse_version from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import pylru, http, coroutine, exception, enforce +from sugar_network.toolkit import http, coroutine, enforce -_MAX_STAT_RECORDS = 100 -_AUTH_POOL_SIZE = 1024 - _logger = logging.getLogger('node.routes') class NodeRoutes(db.Routes, FrontRoutes): - def __init__(self, guid, **kwargs): + def __init__(self, guid, auth=None, **kwargs): db.Routes.__init__(self, **kwargs) FrontRoutes.__init__(self) self._guid = guid - self._auth_pool = pylru.lrucache(_AUTH_POOL_SIZE) - self._auth_config = None - self._auth_config_mtime = 0 + self._auth = auth @property def guid(self): return self._guid + @preroute + def preroute(self, op): + request = this.request + if request.principal: + this.principal = request.principal + elif op.acl & ACL.AUTH: + this.principal = self._auth.logon(request) + else: + this.principal = None + if op.acl & ACL.AUTHOR and request.guid: + if not this.principal: + this.principal = self._auth.logon(request) + allowed = this.principal.admin + if not allowed: + if request.resource == 'user': + allowed = (this.principal == request.guid) + else: + doc = self.volume[request.resource].get(request.guid) + allowed = this.principal in doc['author'] + enforce(allowed, http.Forbidden, 'Authors only') + if op.acl & ACL.SUPERUSER: + if not this.principal: + this.principal = self._auth.logon(request) + enforce(this.principal.admin, http.Forbidden, 'Superusers only') + @route('GET', cmd='whoami', mime_type='application/json') - def whoami(self, request, response): + def whoami(self): roles = [] - if self.authorize(request.principal, 'root'): + if this.principal and this.principal.admin: roles.append('root') - return {'roles': roles, 'guid': request.principal, 'route': 'direct'} + return {'roles': roles, + 'guid': this.principal, + 'route': 'direct', + } @route('GET', cmd='status', mime_type='application/json') def status(self): @@ -69,45 +88,45 @@ class NodeRoutes(db.Routes, FrontRoutes): } @route('POST', ['user'], mime_type='application/json') - def register(self, request): + def register(self): # To avoid authentication while registering new user - self.create(request) + self.create() @fallbackroute('GET', ['packages']) - def route_packages(self, request, response): + def route_packages(self): path = this.request.path if path and path[-1] == 'updates': result = [] last_modified = 0 - for blob in this.volume.blobs.diff( + for blob in self.volume.blobs.diff( [[this.request.if_modified_since + 1, None]], join(*path[:-1]), recursive=False): if '.' in blob.name: continue result.append(blob.name) last_modified = max(last_modified, blob.mtime) - response.content_type = 'application/json' + this.response.content_type = 'application/json' if last_modified: - response.last_modified = last_modified + this.response.last_modified = last_modified return result - blob = this.volume.blobs.get(join(*path)) + blob = self.volume.blobs.get(join(*path)) if isinstance(blob, File): return blob else: - response.content_type = 'application/json' + this.response.content_type = 'application/json' return [i.name for i in blob if '.' not in i.name] @route('POST', ['context'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) def submit_release(self, initial): - blob = this.volume.blobs.post( + blob = self.volume.blobs.post( this.request.content_stream, this.request.content_type) try: context, release = load_bundle(blob, initial=initial) except Exception: - this.volume.blobs.delete(blob.digest) + self.volume.blobs.delete(blob.digest) raise this.call(method='POST', path=['context', context, 'releases'], content_type='application/json', content=release) @@ -116,88 +135,16 @@ class NodeRoutes(db.Routes, FrontRoutes): @route('GET', ['context', None], cmd='solve', arguments={'requires': list, 'stability': list}, mime_type='application/json') - def solve(self, request): - solution = model.solve(self.volume, request.guid, **request) + def solve(self): + solution = model.solve(self.volume, this.request.guid, **this.request) enforce(solution is not None, 'Failed to solve') return solution @route('GET', ['context', None], cmd='resolve', arguments={'requires': list, 'stability': list}) - def resolve(self, request): - solution = self.solve(request) - return this.volume.blobs.get(solution[request.guid]['blob']) - - @preroute - def preroute(self, op, request, response): - if op.acl & ACL.AUTH and request.principal is None: - if not request.authorization: - enforce(self.authorize(None, 'user'), - Unauthorized, 'No credentials') - else: - if request.authorization not in self._auth_pool: - self.authenticate(request.authorization) - self._auth_pool[request.authorization] = True - enforce(not request.authorization.nonce or - request.authorization.nonce >= time.time(), - Unauthorized, 'Credentials expired') - request.principal = request.authorization.login + def resolve(self): + solution = self.solve() + return self.volume.blobs.get(solution[this.request.guid]['blob']) - if op.acl & ACL.AUTHOR and request.guid: - self._enforce_authority(request) - if op.acl & ACL.SUPERUSER: - enforce(self.authorize(request.principal, 'root'), http.Forbidden, - 'Operation is permitted only for superusers') - - def on_aggprop_update(self, request, prop, value): - if prop.acl & ACL.AUTHOR: - self._enforce_authority(request) - elif value is not None: - self._enforce_authority(request, value.get('author')) - - def authenticate(self, auth): - enforce(auth.scheme == 'sugar', http.BadRequest, - 'Unknown authentication scheme') - enforce(self.volume['user'][auth.login].exists, Unauthorized, - 'Principal does not exist') - - from M2Crypto import RSA - - pubkey = self.volume['user'][auth.login]['pubkey'] - key = RSA.load_pub_key(this.volume.blobs.get(pubkey).path) - data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() - enforce(key.verify(data, auth.signature.decode('hex')), - http.Forbidden, 'Bad credentials') - - def authorize(self, user, role): - if role == 'user' and user: - return True - - config_path = join(node.data_root.value, 'authorization.conf') - if exists(config_path): - mtime = os.stat(config_path).st_mtime - if mtime > self._auth_config_mtime: - self._auth_config_mtime = mtime - self._auth_config = ConfigParser() - self._auth_config.read(config_path) - if self._auth_config is None: - return False - - if not user: - user = 'anonymous' - if not self._auth_config.has_section(user): - user = 'DEFAULT' - if self._auth_config.has_option(user, role): - return self._auth_config.get(user, role).strip().lower() in \ - ('true', 'on', '1', 'allow') - - def _enforce_authority(self, request, author=None): - if request.resource == 'user': - allowed = (request.principal == request.guid) - else: - if author is None: - doc = self.volume[request.resource].get(request.guid) - author = doc['author'] - allowed = request.principal in author - enforce(allowed or self.authorize(request.principal, 'root'), - http.Forbidden, 'Operation is permitted only for authors') +this.principal = None diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 76593e9..074ae79 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -41,9 +41,17 @@ _logger = logging.getLogger('node.slave') class SlaveRoutes(NodeRoutes): def __init__(self, volume, **kwargs): - self._creds = http.SugarAuth( - join(volume.root, 'etc', 'private', 'node')) - NodeRoutes.__init__(self, self._creds.login, volume=volume, **kwargs) + guid_path = join(volume.root, 'etc', 'node') + if exists(guid_path): + with file(guid_path) as f: + guid = f.read().strip() + else: + guid = toolkit.uuid() + if not exists(dirname(guid_path)): + os.makedirs(dirname(guid_path)) + with file(guid_path, 'w') as f: + f.write(guid) + NodeRoutes.__init__(self, guid, volume=volume, **kwargs) vardir = join(volume.root, 'var') self._push_r = toolkit.Bin(join(vardir, 'push.ranges'), [[1, None]]) self._pull_r = toolkit.Bin(join(vardir, 'pull.ranges'), [[1, None]]) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 70868c0..675c25f 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -79,41 +79,6 @@ def enforce(condition, error=None, *args): raise exception_class(error) -def exception(*args): - """Log about exception on low log level. - - That might be useful for non-critial exception. Input arguments are the - same as for `logging.exception` function. - - :param args: - optional arguments to pass to logging function; - the first argument might be a `logging.Logger` to use instead of - using direct `logging` calls - - """ - if args and isinstance(args[0], logging.Logger): - logger = args[0] - args = args[1:] - else: - logger = logging - - klass, error, tb = sys.exc_info() - - import traceback - tb = [i.rstrip() for i in traceback.format_exception(klass, error, tb)] - - error_message = str(error) or '%s exception' % type(error).__name__ - if args: - if len(args) == 1: - message = args[0] - else: - message = args[0] % args[1:] - error_message = '%s: %s' % (message, error_message) - - logger.error(error_message) - logger.debug('\n'.join(tb)) - - def ascii(value): if not isinstance(value, basestring): return str(value) @@ -159,15 +124,6 @@ def init_logging(debug_level=None, **kwargs): else: logging_level = 8 - def disable_logger(loggers): - for log_name in loggers: - logger = logging.getLogger(log_name) - logger.propagate = False - logger.addHandler(_NullHandler()) - - logging.Logger.trace = lambda self, message, *args, **kwargs: None - logging.Logger.heartbeat = lambda self, message, *args, **kwargs: None - if logging_level <= 8: logging.Logger.trace = lambda self, message, *args, **kwargs: \ self._log(9, message, args, **kwargs) @@ -176,18 +132,18 @@ def init_logging(debug_level=None, **kwargs): elif logging_level == 9: logging.Logger.trace = lambda self, message, *args, **kwargs: \ self._log(9, message, args, **kwargs) - disable_logger(['sugar_stats']) else: - disable_logger([ - 'requests.packages.urllib3.connectionpool', - 'requests.packages.urllib3.poolmanager', - 'requests.packages.urllib3.response', - 'requests.packages.urllib3', - 'inotify', - 'netlink', - 'sugar_stats', - '0install', - ]) + for log_name in ( + 'requests.packages.urllib3.connectionpool', + 'requests.packages.urllib3.poolmanager', + 'requests.packages.urllib3.response', + 'requests.packages.urllib3', + 'inotify', + 'netlink', + ): + logger = logging.getLogger(log_name) + logger.propagate = False + logger.addHandler(_NullHandler()) root_logger = logging.getLogger('') for i in root_logger.handlers: @@ -196,6 +152,24 @@ def init_logging(debug_level=None, **kwargs): format='%(asctime)s %(levelname)s %(name)s: %(message)s', **kwargs) + def exception(self, *args): + from traceback import format_exception + + klass, error, tb = sys.exc_info() + tb = [i.rstrip() for i in format_exception(klass, error, tb)] + error_message = str(error) or '%s exception' % type(error).__name__ + if args: + if len(args) == 1: + message = args[0] + else: + message = args[0] % args[1:] + error_message = '%s: %s' % (message, error_message) + + self.error(error_message) + self.debug('\n'.join(tb)) + + logging.Logger.exception = exception + def iter_file(*path): with file(join(*path), 'rb') as f: @@ -661,3 +635,7 @@ def _nb_read(stream): return '' finally: fcntl.fcntl(fd, fcntl.F_SETFL, orig_flags) + + +logging.Logger.trace = lambda self, message, *args, **kwargs: None +logging.Logger.heartbeat = lambda self, message, *args, **kwargs: None diff --git a/sugar_network/toolkit/gbus.py b/sugar_network/toolkit/gbus.py index e1b24eb..8b64bf5 100644 --- a/sugar_network/toolkit/gbus.py +++ b/sugar_network/toolkit/gbus.py @@ -19,7 +19,7 @@ import json import struct import logging -from sugar_network.toolkit import coroutine, exception +from sugar_network.toolkit import coroutine _logger = logging.getLogger('gbus') @@ -65,7 +65,7 @@ def pipe(op, *args, **kwargs): try: op(feedback, *args, **kwargs) except Exception: - exception('Failed to call %r(%r, %r)', op, args, kwargs) + _logger.exception('Failed to call %r(%r, %r)', op, args, kwargs) os.close(fd_w) _logger.trace('Pipe %s(%r, %r)', op, args, kwargs) diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 9dd437e..0ebee86 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -13,13 +13,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import sys import json import types -import hashlib import logging -from os.path import join, dirname, exists, expanduser, abspath +from os.path import join, dirname from sugar_network import toolkit from sugar_network.toolkit import i18n, enforce @@ -112,13 +110,12 @@ class Connection(object): _Session = None - def __init__(self, url='', auth=None, max_retries=0, **session_args): + def __init__(self, url='', creds=None, max_retries=0, **session_args): self.url = url - self.auth = auth + self.creds = creds self._max_retries = max_retries self._session_args = session_args self._session = None - self._nonce = None def __repr__(self): return '<Connection url=%s>' % self.url @@ -185,8 +182,8 @@ class Connection(object): f.close() return reply - def upload(self, path, data, **kwargs): - reply = self.request('POST', path, data, params=kwargs) + def upload(self, path_=None, data_=None, **kwargs): + reply = self.request('POST', path_, data_, params=kwargs) if reply.headers.get('Content-Type') == 'application/json': return json.loads(reply.content) else: @@ -206,13 +203,21 @@ class Connection(object): self._session.cookies.clear() try_ = 0 + challenge = None while True: try_ += 1 reply = self._session.request(method, path, data=data, headers=headers, params=params, **kwargs) if reply.status_code == Unauthorized.status_code: - enforce(self.auth is not None, Unauthorized, 'No credentials') - self._authenticate(reply.headers.get('www-authenticate')) + enforce(self.creds is not None, Unauthorized, 'No credentials') + challenge_ = reply.headers.get('www-authenticate') + if challenge and challenge == challenge_: + profile = self.creds.profile + enforce(profile, Unauthorized, 'No way to self-register') + _logger.info('Register on the server') + self.post(['user'], profile) + challenge = challenge_ + self._session.headers.update(self.creds.logon(challenge)) try_ = 0 elif reply.status_code == 200 or \ allowed and reply.status_code in allowed: @@ -319,90 +324,6 @@ class Connection(object): setattr(self._session, arg, value) self._session.stream = True - def _authenticate(self, challenge): - from urllib2 import parse_http_list, parse_keqv_list - - nonce = None - if challenge: - challenge = challenge.split(' ', 1)[-1] - nonce = parse_keqv_list(parse_http_list(challenge)).get('nonce') - - if self._nonce and nonce == self._nonce: - enforce(self.auth.profile(), Unauthorized, 'Bad credentials') - _logger.info('Register on the server') - self.post(['user'], self.auth.profile()) - - self._session.headers['authorization'] = self.auth(nonce) - self._nonce = nonce - - -class SugarAuth(object): - - def __init__(self, key_path, profile=None): - self._key_path = abspath(expanduser(key_path)) - self._profile = profile or {} - self._key = None - self._pubkey = None - self._login = None - - @property - def pubkey(self): - if self._pubkey is None: - self.ensure_key() - from M2Crypto.BIO import MemoryBuffer - buf = MemoryBuffer() - self._key.save_pub_key_bio(buf) - self._pubkey = buf.getvalue() - return self._pubkey - - @property - def login(self): - if self._login is None: - self._login = str(hashlib.sha1(self.pubkey).hexdigest()) - return self._login - - def profile(self): - if 'name' not in self._profile: - self._profile['name'] = self.login - self._profile['pubkey'] = self.pubkey - return self._profile - - def __call__(self, nonce): - self.ensure_key() - data = hashlib.sha1('%s:%s' % (self.login, nonce)).digest() - signature = self._key.sign(data).encode('hex') - return 'Sugar username="%s",nonce="%s",signature="%s"' % \ - (self.login, nonce, signature) - - def ensure_key(self): - from M2Crypto import RSA - from base64 import b64encode - - key_dir = dirname(self._key_path) - if exists(self._key_path): - if os.stat(key_dir).st_mode & 077: - os.chmod(key_dir, 0700) - self._key = RSA.load_key(self._key_path) - return - - if not exists(key_dir): - os.makedirs(key_dir) - os.chmod(key_dir, 0700) - - _logger.info('Generate RSA private key at %r', self._key_path) - self._key = RSA.gen_key(1024, 65537, lambda *args: None) - self._key.save_key(self._key_path, cipher=None) - os.chmod(self._key_path, 0600) - - pub_key_path = self._key_path + '.pub' - with file(pub_key_path, 'w') as f: - f.write('ssh-rsa %s %s@%s' % ( - b64encode('\x00\x00\x00\x07ssh-rsa%s%s' % self._key.pub()), - self.login, - os.uname()[1], - )) - _logger.info('Saved RSA public key at %r', pub_key_path) - class _Subscription(object): @@ -431,8 +352,9 @@ class _Subscription(object): except Exception: if try_ == 0: raise - toolkit.exception('Failed to read from %r subscription, ' - 'will resubscribe', self._client.url) + _logger.exception( + 'Failed to read from %r subscription, resubscribe', + self._client.url) self._content = None return _parse_event(line) diff --git a/sugar_network/toolkit/mountpoints.py b/sugar_network/toolkit/mountpoints.py index 28076d7..f8324fa 100644 --- a/sugar_network/toolkit/mountpoints.py +++ b/sugar_network/toolkit/mountpoints.py @@ -19,7 +19,7 @@ from os.path import join, exists from sugar_network.toolkit.inotify import Inotify, \ IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM -from sugar_network.toolkit import coroutine, exception +from sugar_network.toolkit import coroutine _COMPLETE_MOUNT_TIMEOUT = 3 @@ -96,4 +96,4 @@ def _call(path, filename, cb): try: cb(path) except Exception: - exception(_logger, 'Cannot call %r for %r mount', cb, path) + _logger.exception('Cannot call %r for %r mount', cb, path) diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py index f09bdb5..9d583cd 100644 --- a/sugar_network/toolkit/parcel.py +++ b/sugar_network/toolkit/parcel.py @@ -99,7 +99,10 @@ def encode(packets, limit=None, header=None, compresslevel=None, blob_len = 0 if isinstance(record, File): blob_len = record.size - chunk = ostream.write_record(record, + chunk = record.meta + else: + chunk = record + chunk = ostream.write_record(chunk, None if finalizing else limit - blob_len) if chunk is None: _logger.debug('Reach the encoding limit') diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 8eb84da..e9e91fd 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -16,7 +16,6 @@ import os import cgi import json -import time import types import logging import calendar @@ -33,7 +32,6 @@ from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import i18n, http, coroutine, enforce -_SIGNATURE_LIFETIME = 600 _NOT_SET = object() _logger = logging.getLogger('router') @@ -106,15 +104,6 @@ class ACL(object): } -class Unauthorized(http.Unauthorized): - - def __init__(self, message, nonce=None): - http.Unauthorized.__init__(self, message) - if not nonce: - nonce = int(time.time()) + _SIGNATURE_LIFETIME - self.headers = {'www-authenticate': 'Sugar nonce="%s"' % nonce} - - class Request(dict): def __init__(self, environ=None, method=None, path=None, cmd=None, @@ -133,7 +122,6 @@ class Request(dict): self._accept_language = _NOT_SET self._content_stream = content_stream or _NOT_SET self._content_type = content_type or _NOT_SET - self._authorization = _NOT_SET if environ: url = environ.get('PATH_INFO', '').strip('/') @@ -299,28 +287,6 @@ class Request(dict): self._dirty_query = False return self.environ.get('QUERY_STRING') - @property - def authorization(self): - if self._authorization is _NOT_SET: - auth = self.environ.get('HTTP_AUTHORIZATION') - if not auth: - self._authorization = None - else: - auth = self._authorization = _Authorization(auth) - auth.scheme, creds = auth.strip().split(' ', 1) - auth.scheme = auth.scheme.lower() - if auth.scheme == 'basic': - auth.login, auth.password = b64decode(creds).split(':') - elif auth.scheme == 'sugar': - from urllib2 import parse_http_list, parse_keqv_list - creds = parse_keqv_list(parse_http_list(creds)) - auth.login = creds['username'] - auth.signature = creds['signature'] - auth.nonce = int(creds['nonce']) - else: - raise http.BadRequest('Unsupported authentication scheme') - return self._authorization - def add(self, key, *values): existing_value = self.get(key) for value in values: @@ -418,18 +384,29 @@ class Response(CaseInsensitiveDict): return '<Response %r>' % items -class File(CaseInsensitiveDict): +class File(str): AWAY = None class Digest(str): pass - def __init__(self, path, digest=None, meta=None): - CaseInsensitiveDict.__init__(self, meta or []) + def __new__(cls, path=None, digest=None, meta=None): + meta = CaseInsensitiveDict(meta or []) + + url = '' + if meta: + url = meta.get('location') + if not url and digest: + url = '%s/blobs/%s' % (this.request.static_prefix, digest) + self = str.__new__(cls, url) + + self.meta = meta self.path = path self.digest = File.Digest(digest) if digest else None - self._stat = None + self.stat = None + + return self @property def exists(self): @@ -437,47 +414,37 @@ class File(CaseInsensitiveDict): @property def size(self): - if self._stat is None: + if self.stat is None: if not self.exists: - size = self.get('content-length', 0) + size = self.meta.get('content-length', 0) return int(size) if size else 0 - self._stat = os.stat(self.path) - return self._stat.st_size + self.stat = os.stat(self.path) + return self.stat.st_size @property def mtime(self): - if self._stat is None: - self._stat = os.stat(self.path) - return int(self._stat.st_mtime) - - @property - def url(self): - if self is File.AWAY: - return '' - return self.get('location') or \ - '%s/blobs/%s' % (this.request.static_prefix, self.digest) + if self.stat is None: + self.stat = os.stat(self.path) + return int(self.stat.st_mtime) @property def name(self): if self.path: return basename(self.path) - def __repr__(self): - return '<File %r>' % self.url - def iter_content(self): if self.path: return self._iter_content() - url = self.get('location') + url = self.meta.get('location') enforce(url, http.NotFound, 'No location') blob = this.http.request('GET', url, allow_redirects=True, # Request for uncompressed data headers={'accept-encoding': ''}) - self.clear() + self.meta.clear() for tag in ('content-length', 'content-type', 'content-disposition'): value = blob.headers.get(tag) if value: - self[tag] = value + self.meta[tag] = value return blob.iter_content(toolkit.BUFFER_SIZE) def _iter_content(self): @@ -544,8 +511,7 @@ class Router(object): this.call = self.call - def call(self, request=None, response=None, environ=None, principal=None, - **kwargs): + def call(self, request=None, response=None, environ=None, **kwargs): if request is None: if this.request is not None: if not environ: @@ -558,9 +524,7 @@ class Router(object): ): if key in this.request.environ: environ[key] = this.request.environ[key] - if not principal: - principal = this.request.principal - request = Request(environ=environ, principal=principal, **kwargs) + request = Request(environ=environ, **kwargs) if response is None: response = Response() @@ -583,15 +547,10 @@ class Router(object): 'Cannot typecast %r argument: %s' % (arg, error)) kwargs = {} for arg in route_.kwarg_names: - if arg == 'request': - kwargs[arg] = request - elif arg == 'response': - kwargs[arg] = response - elif arg not in kwargs: - kwargs[arg] = request.get(arg) + kwargs[arg] = request.get(arg) for i in self._preroutes: - i(route_, request, response) + i(route_) result = None exception = None try: @@ -609,7 +568,7 @@ class Router(object): raise finally: for i in self._postroutes: - i(request, response, result, exception) + i(result, exception) return result @@ -638,9 +597,10 @@ class Router(object): result = self.call(request, response) if isinstance(result, File): - response.update(result) - if 'location' in result: - raise http.Redirect(result['location']) + enforce(result is not File.AWAY, http.NotFound, 'No such file') + response.update(result.meta) + if 'location' in result.meta: + raise http.Redirect(result.meta['location']) enforce(isfile(result.path), 'No such file') if request.if_modified_since and \ result.mtime <= request.if_modified_since: @@ -663,7 +623,7 @@ class Router(object): if error.headers: response.update(error.headers) except Exception, error: - toolkit.exception('Error while processing %r request', request.url) + _logger.exception('Error while processing %r request', request.url) if isinstance(error, http.Status): response.status = error.status response.update(error.headers or {}) @@ -946,7 +906,7 @@ class _Route(object): if hasattr(callback, 'func_code'): code = callback.func_code # `1:` is for skipping the first, `self` or `cls`, argument - self.kwarg_names = code.co_varnames[1:code.co_argcount] + self.kwarg_names = set(code.co_varnames[1:code.co_argcount]) def __repr__(self): path = '/'.join(['*' if i is None else i for i in self.path]) @@ -955,12 +915,4 @@ class _Route(object): return '%s /%s (%s)' % (self.method, path, self.callback.__name__) -class _Authorization(str): - scheme = None - login = None - password = None - signature = None - nonce = None - - File.AWAY = File(None) diff --git a/sugar_network/toolkit/spec.py b/sugar_network/toolkit/spec.py index bd852d4..b3f83e9 100644 --- a/sugar_network/toolkit/spec.py +++ b/sugar_network/toolkit/spec.py @@ -20,7 +20,7 @@ from os.path import join, exists, dirname from ConfigParser import ConfigParser from sugar_network.toolkit.licenses import GOOD_LICENSES -from sugar_network.toolkit import exception, enforce +from sugar_network.toolkit import enforce EMPTY_LICENSE = 'License is not specified' @@ -104,7 +104,6 @@ def parse_version(version_string, ignore_errors=False): else: parts[x] = [] # (because ''.split('.') == [''], not []) except ValueError as error: - exception() raise ValueError('Invalid version format in "%s": %s' % (version_string, error)) except KeyError as error: |