diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-27 14:52:25 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-27 14:52:25 (GMT) |
commit | 2dbc9b554f322ea23b224d923d9a6475e33ad6e9 (patch) | |
tree | 55df42ddf7a0ec8d4ca6ef007218b1056409dc0b /sugar_network | |
parent | 046073b04229021ec53833a353ffd069d0a5b561 (diff) |
Implementation polishing
* http.request does not load posting stream before sending;
* one-sigment packets;
* move node related code to node module;
* Principal capabilities;
* batch posting while pushing client offline updates.
Diffstat (limited to 'sugar_network')
30 files changed, 1227 insertions, 922 deletions
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py index 648d418..b985f90 100644 --- a/sugar_network/client/__init__.py +++ b/sugar_network/client/__init__.py @@ -15,7 +15,6 @@ import os import logging -from base64 import b64encode from os.path import join, expanduser, exists from sugar_network.toolkit import http, Option @@ -166,10 +165,15 @@ def stability(context): return value.split() -def Connection(url=None, **args): +def Connection(url=None, creds=None, **kwargs): if url is None: url = api.value - return http.Connection(url, verify=not no_check_certificate.value, **args) + if creds is None and keyfile.value: + from sugar_network.client.auth import SugarCreds + creds = SugarCreds(keyfile.value) + return http.Connection(url, + auth_request={'method': 'GET', 'params': {'cmd': 'logon'}}, + creds=creds, verify=not no_check_certificate.value, **kwargs) def IPCConnection(): diff --git a/sugar_network/client/auth.py b/sugar_network/client/auth.py index db95aa5..c1c86ed 100644 --- a/sugar_network/client/auth.py +++ b/sugar_network/client/auth.py @@ -15,11 +15,15 @@ import os import hashlib +import logging from base64 import b64encode from urllib2 import parse_http_list, parse_keqv_list from os.path import abspath, expanduser, dirname, exists +_logger = logging.getLogger('client.auth') + + class BasicCreds(object): def __init__(self, login, password): diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py index 5a6f894..de0fbf8 100644 --- a/sugar_network/client/journal.py +++ b/sugar_network/client/journal.py @@ -141,13 +141,13 @@ class Routes(object): subrequest = Request(method='PUT', document='artifact', guid=subguid, prop='preview') subrequest.content_type = 'image/png' - with file(preview_path, 'rb') as subrequest.content_stream: + with file(preview_path, 'rb') as subrequest.content: self.fallback(subrequest) subrequest = Request(method='PUT', document='artifact', guid=subguid, prop='data') subrequest.content_type = get(guid, 'mime_type') or 'application/octet' - with file(data_path, 'rb') as subrequest.content_stream: + with file(data_path, 'rb') as subrequest.content: self.fallback(subrequest) def journal_update(self, guid, data=None, **kwargs): diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py index fd85a4d..0c5991f 100644 --- a/sugar_network/client/model.py +++ b/sugar_network/client/model.py @@ -20,8 +20,8 @@ from sugar_network.model.user import User from sugar_network.model.post import Post from sugar_network.model.report import Report from sugar_network.model.context import Context as _Context +from sugar_network.toolkit.router import ACL, File from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit.router import ACL _logger = logging.getLogger('client.model') @@ -43,3 +43,63 @@ class Volume(db.Volume): db.Volume.__init__(self, root, resources) for directory in self.values(): directory.metadata['author'].acl |= ACL.LOCAL + + +def dump_volume(volume): + for resource, directory in volume.items(): + if not directory.has_seqno: + continue + + for doc in directory: + if not doc['seqno'] or doc['state'] != 'active': + continue + + dump = {} + op = dump['op'] = {} + props = dump['content'] = {} + keys = [] + postfix = [] + + for name, prop in doc.metadata.items(): + meta = doc.meta(name) + if meta is None or 'seqno' not in meta: + continue + if isinstance(prop, db.Aggregated): + for aggid, value in doc.repr(name): + aggop = { + 'method': 'POST', + 'path': [resource, doc.guid, name, aggid], + } + if isinstance(value, File): + value.meta['op'] = aggop + postfix.append(value) + else: + postfix.append({'op': aggop, 'content': value}) + elif prop.acl & (ACL.WRITE | ACL.CREATE): + if isinstance(prop, db.Blob): + blob = volume.blobs.get(doc[name]) + blob.meta['op'] = { + 'method': 'PUT', + 'path': [resource, doc.guid, name], + } + postfix.append(blob) + else: + if isinstance(prop, db.Reference): + keys.append(name) + props[name] = doc[name] + + if 'seqno' in doc.meta('guid'): + keys.append('guid') + props['guid'] = doc.guid + op['method'] = 'POST' + op['path'] = [resource] + else: + op['method'] = 'PUT' + op['path'] = [resource, doc.guid] + + if keys: + dump['keys'] = keys + + yield dump + for dump in postfix: + yield dump diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index f618df3..8a037ee 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -20,11 +20,12 @@ from os.path import join from sugar_network import db, client, node, toolkit from sugar_network.model import FrontRoutes +from sugar_network.client import model from sugar_network.client.journal import Routes as JournalRoutes from sugar_network.toolkit.router import Request, Router, Response from sugar_network.toolkit.router import route, fallbackroute from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel +from sugar_network.toolkit import netlink, zeroconf, coroutine, http, packets from sugar_network.toolkit import ranges, lsb_release, enforce @@ -54,10 +55,6 @@ class ClientRoutes(FrontRoutes, JournalRoutes): self._no_subscription = no_subscription self._pull_r = toolkit.Bin( join(home_volume.root, 'var', 'pull'), [[1, None]]) - self._push_r = toolkit.Bin( - join(home_volume.root, 'var', 'push'), [[1, None]]) - self._push_guids_map = toolkit.Bin( - join(home_volume.root, 'var', 'push-guids'), {}) def connect(self, api=None): if self._connect_jobs: @@ -123,7 +120,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes): result = self.fallback() result['route'] = 'proxy' else: - result = {'roles': [], 'route': 'offline'} + result = {'route': 'offline'} result['guid'] = self._creds.login return result @@ -141,7 +138,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes): for logfile in logs: with file(logfile) as f: self.fallback(method='POST', path=['report', guid, 'logs'], - content_stream=f, content_type='text/plain') + content=f, content_type='text/plain') yield {'event': 'done', 'guid': guid} @route('GET', ['context', None], cmd='launch', arguments={'args': list}, @@ -196,7 +193,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes): if pins and item['mtime'] > checkin['mtime']: pull = Request(method='GET', path=[checkin.metadata.name, checkin.guid], cmd='diff') - self._sync_jobs.spawn(self._pull_checkin, pull, None, 'range') + self._sync_jobs.spawn(self._pull_checkin, pull, None, 'ranges') return result @route('GET', [None, None], mime_type='application/json') @@ -353,7 +350,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes): _logger.debug('Checkin %r context', local_context.guid) pull = Request(method='GET', path=['context', local_context.guid], cmd='diff') - self._pull_checkin(pull, None, 'range') + self._pull_checkin(pull, None, 'ranges') pins = local_context['pins'] if pin and pin not in pins: contexts.update(local_context.guid, {'pins': pins + [pin]}) @@ -374,79 +371,69 @@ class ClientRoutes(FrontRoutes, JournalRoutes): def _pull_checkin(self, request, response, header_key): request.headers[header_key] = self._pull_r.value - patch = self.fallback(request, response) - __, committed = self._local.volume.patch(next(parcel.decode(patch)), - shift_seqno=False) - ranges.exclude(self._pull_r.value, committed) + packet = packets.decode(self.fallback(request, response)) - def _sync(self): - _logger.info('Start pulling updates') + volume = self._local.volume + volume[request.resource].patch(request.guid, packet['patch']) + for blob in packet: + volume.blobs.patch(blob) + ranges.exclude(self._pull_r.value, packet['ranges']) + + def _pull(self): + _logger.debug('Start pulling checkin updates') + response = Response() for directory in self._local.volume.values(): if directory.empty: continue request = Request(method='GET', path=[directory.metadata.name], cmd='diff') - response = Response() while True: - request.headers['range'] = self._pull_r.value - r, guids = self.fallback(request, response) - if not r: + request.headers['ranges'] = self._pull_r.value + diff = self.fallback(request, response) + if not diff: break - for guid in guids: + for guid, r in diff.items(): checkin = Request(method='GET', path=[request.resource, guid], cmd='diff') - self._pull_checkin(checkin, response, 'range') - ranges.exclude(self._pull_r.value, r) - self._pull_r.commit() - this.localcast({'event': 'sync', 'state': 'pull'}) - - """ - resource = None - metadata = None - - for diff in self._local.volume.diff(self._push_r.value, blobs=False): - if 'resource' in diff: - resource = diff['resource'] - metadata = self._local.volume[resource] - elif 'commit' in diff: - ranges.exclude(self._push_r.value, diff['commit']) - self._push_r.commit() - # No reasons to keep failure reports after pushing - self._local.volume['report'].wipe() - else: - props = {} - blobs = [] - for prop, meta in diff['patch'].items(): - if isinstance(metadata[prop], db.Blob): - blobs.application - - - - props[prop] = meta['value'] - - - - if isinstance(diff, File): - with file(diff.path, 'rb') as f: - self.fallback(method='POST') + self._pull_checkin(checkin, response, 'ranges') + ranges.exclude(self._pull_r.value, r) + def _push(self): + volume = self._local.volume + _logger.debug('Start pushing offline updates') + dump = packets.encode(model.dump_volume(volume)) + request = Request(method='POST', cmd='apply', content=dump) + self.fallback(request, Response()) + _logger.debug('Wipeout offline updates') + for directory in volume.values(): + if directory.empty: + continue + if directory.has_noseqno: + directory.dilute() + else: + directory.wipe() - pass + _logger.debug('Wipeout offline blobs') + for blob in volume.blobs.walk(): + if int(blob.meta['x-seqno']): + volume.blobs.wipe(blob) - if 'guid' in props: - request = Request(method='POST', path=[resource]) - else: - request = Request(method='PUT', path=[resource, guid]) - request.content_type = 'application/json' - request.content = props - self.fallback(request) - """ + def _sync(self): + try: + self._pull() + if self._local.volume.has_seqno: + self._push() + except: + this.localcast({'event': 'sync', 'state': 'failed'}) + raise + else: + this.localcast({'event': 'sync', 'state': 'done'}) class _LocalRoutes(db.Routes, Router): diff --git a/sugar_network/db/__init__.py b/sugar_network/db/__init__.py index d6b12c5..6a6b27c 100644 --- a/sugar_network/db/__init__.py +++ b/sugar_network/db/__init__.py @@ -351,7 +351,7 @@ Volume from sugar_network.db.metadata import \ stored_property, indexed_property, Property, Numeric, Boolean, Dict, \ - Enum, List, Aggregated, Blob, Localized + Enum, List, Aggregated, Blob, Localized, Reference from sugar_network.db.index import index_flush_timeout, \ index_flush_threshold, index_write_queue from sugar_network.db.resource import Resource diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index ce5bb1b..94e914c 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -43,6 +43,8 @@ class Blobs(object): def path(self, path=None): if path is None: return join(self._root, 'files') + if isinstance(path, File): + return self._blob_path(path.digest) if isinstance(path, basestring): path = path.split(os.sep) if len(path) == 1 and len(path[0]) == 40 and '.' not in path[0]: @@ -51,7 +53,47 @@ class Blobs(object): return join(assets.PATH, *path[1:]) return join(self._root, 'files', *path) + def walk(self, path=None, include=None, recursive=True, all_files=False): + if path is None: + is_files = False + root = self._blob_path() + else: + path = path.strip('/').split('/') + enforce(not [i for i in path if i == '..'], + http.BadRequest, 'Relative paths are not allowed') + is_files = True + root = self.path(path) + + for root, __, files in os.walk(root): + if include is not None and \ + not ranges.contains(include, int(os.stat(root).st_mtime)): + continue + api_path = root[len(self._root) + 7:] if is_files else None + for filename in files: + if filename.endswith(_META_SUFFIX): + if not all_files: + digest = filename[:-len(_META_SUFFIX)] + path = join(root, digest) + yield File(path, digest, _read_meta(path)) + continue + elif not all_files: + continue + yield root, api_path, filename + if not recursive: + break + def post(self, content, mime_type=None, digest_to_assert=None, meta=None): + if isinstance(content, File): + seqno = self._seqno.next() + meta = content.meta.copy() + meta['x-seqno'] = str(seqno) + path = self._blob_path(content.digest) + if not exists(dirname(path)): + os.makedirs(dirname(path)) + os.link(content.path, path) + _write_meta(path, meta, seqno) + return File(path, content.digest, meta) + if meta is None: meta = [] meta.append(('content-type', @@ -94,9 +136,8 @@ class Blobs(object): seqno = self._seqno.next() meta.append(('content-length', str(blob.tell()))) meta.append(('x-seqno', str(seqno))) - _write_meta(path, meta, seqno) blob.name = path - os.utime(path, (seqno, seqno)) + _write_meta(path, meta, seqno) _logger.debug('Post %r file', path) @@ -121,75 +162,67 @@ class Blobs(object): def delete(self, path): self._delete(self.path(path), None) + def wipe(self, path): + path = self.path(path) + if exists(path + _META_SUFFIX): + os.unlink(path + _META_SUFFIX) + if exists(path): + _logger.debug('Wipe %r file', path) + os.unlink(path) + def populate(self, path=None, recursive=True): for __ in self.diff([[1, None]], path or '', recursive): pass def diff(self, r, path=None, recursive=True): - if path is None: - is_files = False - root = self._blob_path() - else: - path = path.strip('/').split('/') - enforce(not [i for i in path if i == '..'], - http.BadRequest, 'Relative paths are not allowed') - is_files = True - root = self.path(path) + is_files = path is not None checkin_seqno = None - for root, __, files in os.walk(root): - if not ranges.contains(r, int(os.stat(root).st_mtime)): - continue - rel_root = root[len(self._root) + 7:] if is_files else None - for filename in files: - path = join(root, filename) - if filename.endswith(_META_SUFFIX): - seqno = int(os.stat(path).st_mtime) - path = path[:-len(_META_SUFFIX)] - meta = None - if exists(path): - stat = os.stat(path) - if seqno != int(stat.st_mtime): - _logger.debug('Found updated %r blob', path) - seqno = self._seqno.next() - meta = _read_meta(path) - meta['x-seqno'] = str(seqno) - meta['content-length'] = str(stat.st_size) - _write_meta(path, meta, seqno) - os.utime(path, (seqno, seqno)) - if not ranges.contains(r, seqno): - continue - if meta is None: + for root, rel_root, filename in self.walk(path, r, recursive, True): + path = join(root, filename) + if filename.endswith(_META_SUFFIX): + seqno = int(os.stat(path).st_mtime) + path = path[:-len(_META_SUFFIX)] + meta = None + if exists(path): + stat = os.stat(path) + if seqno != int(stat.st_mtime): + _logger.debug('Found updated %r blob', path) + seqno = self._seqno.next() meta = _read_meta(path) - if is_files: - digest = join(rel_root, filename[:-len(_META_SUFFIX)]) - meta['path'] = digest - else: - digest = filename[:-len(_META_SUFFIX)] - elif not is_files or exists(path + _META_SUFFIX): + meta['x-seqno'] = str(seqno) + meta['content-length'] = str(stat.st_size) + _write_meta(path, meta, seqno) + if not ranges.contains(r, seqno): continue + if meta is None: + meta = _read_meta(path) + if is_files: + digest = join(rel_root, filename[:-len(_META_SUFFIX)]) + meta['path'] = digest else: - _logger.debug('Found new %r blob', path) - mime_type = mimetypes.guess_type(filename)[0] or \ - 'application/octet-stream' - if checkin_seqno is None: - checkin_seqno = self._seqno.next() - seqno = checkin_seqno - meta = [('content-type', mime_type), - ('content-length', str(os.stat(path).st_size)), - ('x-seqno', str(seqno)), - ] - _write_meta(path, meta, seqno) - os.utime(path, (seqno, seqno)) - if not ranges.contains(r, seqno): - continue - digest = join(rel_root, filename) - meta.append(('path', digest)) - yield File(path, digest, meta) - if not recursive: - break + digest = filename[:-len(_META_SUFFIX)] + elif not is_files or exists(path + _META_SUFFIX): + continue + else: + _logger.debug('Found new %r blob', path) + mime_type = mimetypes.guess_type(filename)[0] or \ + 'application/octet-stream' + if checkin_seqno is None: + checkin_seqno = self._seqno.next() + seqno = checkin_seqno + meta = [('content-type', mime_type), + ('content-length', str(os.stat(path).st_size)), + ('x-seqno', str(seqno)), + ] + _write_meta(path, meta, seqno) + if not ranges.contains(r, seqno): + continue + digest = join(rel_root, filename) + meta.append(('path', digest)) + yield File(path, digest, meta) - def patch(self, patch, seqno): + def patch(self, patch, seqno=0): if 'path' in patch.meta: path = self.path(patch.meta.pop('path')) else: @@ -207,7 +240,6 @@ class Blobs(object): meta = patch.meta meta['x-seqno'] = str(seqno) _write_meta(path, meta, seqno) - os.utime(path, (seqno, seqno)) def _delete(self, path, seqno): if exists(path + _META_SUFFIX): @@ -228,6 +260,8 @@ class Blobs(object): def _write_meta(path, meta, seqno): + if seqno: + os.utime(path, (seqno, seqno)) path += _META_SUFFIX with toolkit.new_file(path) as f: for key, value in meta.items() if isinstance(meta, dict) else meta: diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index 17ff27d..79e7332 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -27,6 +27,10 @@ from sugar_network.toolkit import enforce # To invalidate existed index on stcuture changes _LAYOUT_VERSION = 4 +_STATE_HAS_SEQNO = 1 +_STATE_HAS_NOSEQNO = 2 + + _logger = logging.getLogger('db.directory') @@ -53,12 +57,26 @@ class Directory(object): self._storage = None self._index = None self._broadcast = broadcast + self._state = toolkit.Bin( + join(root, 'index', self.metadata.name, 'state'), 0) self._open() @property def empty(self): - return True if self._index is None else (self._index.mtime == 0) + return not self._state.value & (_STATE_HAS_SEQNO | _STATE_HAS_NOSEQNO) + + @property + def has_seqno(self): + return self._state.value & _STATE_HAS_SEQNO + + @property + def has_noseqno(self): + return self._state.value & _STATE_HAS_NOSEQNO + + def __iter__(self): + for guid in self._storage.walk(0): + yield self.get(guid) def wipe(self): self.close() @@ -67,8 +85,23 @@ class Directory(object): ignore_errors=True) shutil.rmtree(join(self._root, 'db', self.metadata.name), ignore_errors=True) + self._state.value = 0 self._open() + def dilute(self): + for doc in self: + if 'seqno' in doc.record.get('guid'): + self._index.delete(doc.guid, self._postdelete, doc.guid, None) + continue + doc.record.unset('seqno') + for prop in self.metadata.keys(): + meta = doc.record.get(prop) + if meta is None or 'seqno' not in meta: + continue + meta.pop('seqno') + doc.record.set(prop, **meta) + self._state.value ^= _STATE_HAS_SEQNO + def close(self): """Flush index write pending queue and close the index.""" if self._index is None: @@ -158,9 +191,9 @@ class Directory(object): """ found = False - migrate = (self._index.mtime == 0) + migrate = self.empty - for guid in self._storage.walk(self._index.mtime): + for guid in self._storage.walk(self._state.mtime): if not found: _logger.info('Start populating %r index', self.metadata.name) found = True @@ -175,7 +208,7 @@ class Directory(object): meta = record.get(name) if meta is not None: props[name] = meta['value'] - self._index.store(guid, props) + self._index.store(guid, props, self._preindex) yield except Exception: _logger.exception('Cannot populate %r in %r, invalidate it', @@ -195,7 +228,7 @@ class Directory(object): for doc in docs: yield doc - def patch(self, guid, patch, seqno=None): + def patch(self, guid, patch, seqno=False): """Apply changes for documents.""" doc = self.resource(guid, self._storage.get(guid)) merged = False @@ -239,6 +272,10 @@ class Directory(object): doc = self.resource(guid, self._storage.get(guid), changes) for prop in self.metadata: enforce(doc[prop] is not None, 'Empty %r property', prop) + if changes.get('seqno'): + self._state.value |= _STATE_HAS_SEQNO + else: + self._state.value |= _STATE_HAS_NOSEQNO return doc def _prestore(self, guid, changes, event): @@ -253,15 +290,21 @@ class Directory(object): return None for prop in self.metadata.keys(): enforce(doc[prop] is not None, 'Empty %r property', prop) + if changes.get('seqno'): + self._state.value |= _STATE_HAS_SEQNO + else: + self._state.value |= _STATE_HAS_NOSEQNO return doc def _postdelete(self, guid, event): self._storage.delete(guid) - self.broadcast(event) + if event: + self.broadcast(event) def _postcommit(self): self._seqno.commit() - self.broadcast({'event': 'commit', 'mtime': self._index.mtime}) + self._state.commit() + self.broadcast({'event': 'commit', 'mtime': self._state.mtime}) def _save_layout(self): path = join(self._root, 'index', self.metadata.name, 'layout') diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py index 0270dd4..b46fe1b 100644 --- a/sugar_network/db/index.py +++ b/sugar_network/db/index.py @@ -13,12 +13,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import re import time import shutil import logging -from os.path import exists, join import xapian @@ -60,21 +58,12 @@ class IndexReader(object): self._db = None self._props = {} self._path = root - self._mtime_path = join(self._path, 'mtime') self._commit_cb = commit_cb for name, prop in self.metadata.items(): if prop.indexed: self._props[name] = prop - @property - def mtime(self): - """UNIX seconds of the last `commit()` call.""" - if exists(self._mtime_path): - return int(os.stat(self._mtime_path).st_mtime) - else: - return 0 - def ensure_open(self): pass @@ -418,17 +407,10 @@ class IndexWriter(IndexReader): self._db.commit() else: self._db.flush() - - checkpoint = time.time() - if exists(self._mtime_path): - os.utime(self._mtime_path, (checkpoint, checkpoint)) - else: - with file(self._mtime_path, 'w'): - pass self._pending_updates = 0 _logger.debug('Commit to %r took %s seconds', - self.metadata.name, checkpoint - ts) + self.metadata.name, time.time() - ts) if self._commit_cb is not None: self._commit_cb() diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index e820fc9..d7d9065 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -149,6 +149,8 @@ class Property(object): enforce(name == 'guid' or prefix != GUID_PREFIX, 'Prefix %r is reserved for internal needs in %r', GUID_PREFIX, name) + enforce(acl ^ ACL.AUTHOR or acl & ACL.AUTH, + 'ACL.AUTHOR without ACL.AUTH') self.setter = None self.on_get = lambda self, x: x @@ -202,6 +204,10 @@ class Property(object): ACL.NAMES[mode], self.name) +class Reference(Property): + pass + + class Boolean(Property): def typecast(self, value): diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 2c2e46b..7560024 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -168,7 +168,7 @@ class Resource(object): def diff(self, r, out_r=None): patch = {} for name, prop in self.metadata.items(): - if name == 'seqno' or prop.acl & (ACL.CALC | ACL.LOCAL): + if name == 'seqno' or prop.acl & ACL.LOCAL: continue meta = self.meta(name) if meta is None: diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index a1bb75e..0ea1305 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -15,21 +15,17 @@ # pylint: disable-msg=W0611 -import re import logging from contextlib import contextmanager from sugar_network import toolkit from sugar_network.db.metadata import Aggregated from sugar_network.toolkit.router import ACL, File -from sugar_network.toolkit.router import route, postroute, fallbackroute +from sugar_network.toolkit.router import route, fallbackroute, preroute from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, parcel, ranges, enforce +from sugar_network.toolkit import http, ranges, enforce -_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$') -_GROUPED_DIFF_LIMIT = 1024 - _logger = logging.getLogger('db.routes') @@ -37,18 +33,12 @@ class Routes(object): def __init__(self, volume, find_limit=None): this.volume = self.volume = volume + this.add_property('resource', _get_resource) self._find_limit = find_limit - @postroute - def postroute(self, result, exception): - request = this.request - if not request.guid: - return result - pull = request.headers['pull'] - if pull is None: - return result - this.response.content_type = 'application/octet-stream' - return self._object_diff(pull) + @preroute + def __preroute__(self, op): + this.reset_property('resource') @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') def create(self): @@ -71,11 +61,7 @@ class Routes(object): @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR) def update_prop(self): request = this.request - if request.content is None: - value = request.content_stream - else: - value = request.content - request.content = {request.prop: value} + request.content = {request.prop: request.content} self.update() @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) @@ -133,17 +119,17 @@ class Routes(object): return self.get_prop() @route('POST', [None, None, None], - acl=ACL.AUTH, mime_type='application/json') + acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json') def insert_to_aggprop(self): return self._aggpost(ACL.INSERT) @route('PUT', [None, None, None, None], - acl=ACL.AUTH, mime_type='application/json') + acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json') def update_aggprop(self): self._aggpost(ACL.REPLACE) @route('DELETE', [None, None, None, None], - acl=ACL.AUTH, mime_type='application/json') + acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json') def remove_from_aggprop(self): self._aggpost(ACL.REMOVE) @@ -180,80 +166,17 @@ class Routes(object): del authors[user] directory.update(request.guid, {'author': authors}) - @route('GET', [None], cmd='diff', mime_type='application/json') - def grouped_diff(self, key): - if not key: - key = 'guid' - in_r = this.request.headers['range'] or [[1, None]] - out_r = [] - diff = set() - - for doc in self.volume[this.request.resource].diff(in_r): - diff.add(doc.guid) - if len(diff) > _GROUPED_DIFF_LIMIT: - break - ranges.include(out_r, doc['seqno'], doc['seqno']) - doc.diff(in_r, out_r) - - return out_r, list(diff) - - @route('GET', [None, None], cmd='diff') - def object_diff(self): - return self._object_diff(this.request.headers['range']) - @fallbackroute('GET', ['blobs']) def blobs(self): return self.volume.blobs.get(this.request.guid) - def _object_diff(self, in_r): - request = this.request - doc = self.volume[request.resource][request.guid] - enforce(doc.exists, http.NotFound, 'Resource not found') - - out_r = [] - if in_r is None: - in_r = [[1, None]] - patch = doc.diff(in_r, out_r) - if not patch: - return parcel.encode([(None, None, [])], compresslevel=0) - - diff = [{'resource': request.resource}, - {'guid': request.guid, 'patch': patch}, - ] - - def add_blob(blob): - if not isinstance(blob, File): - return - seqno = int(blob.meta['x-seqno']) - ranges.include(out_r, seqno, seqno) - diff.append(blob) - - for prop, meta in patch.items(): - prop = doc.metadata[prop] - value = prop.reprcast(meta['value']) - if isinstance(prop, Aggregated): - for __, aggvalue in value: - add_blob(aggvalue) - else: - add_blob(value) - diff.append({'commit': out_r}) - - return parcel.encode([(None, None, diff)], compresslevel=0) - @contextmanager def _post(self, access): content = this.request.content enforce(isinstance(content, dict), http.BadRequest, 'Invalid value') if access == ACL.CREATE: - guid = content.get('guid') - if guid: - enforce(this.principal and this.principal.admin, - http.BadRequest, 'GUID should not be specified') - enforce(_GUID_RE.match(guid) is not None, - http.BadRequest, 'Malformed GUID') - else: - guid = toolkit.uuid() + guid = content.get('guid') or toolkit.uuid() doc = self.volume[this.request.resource][guid] enforce(not doc.exists, 'Resource already exists') doc.posts['guid'] = guid @@ -261,6 +184,8 @@ class Routes(object): if name not in content and prop.default is not None: doc.posts[name] = prop.default else: + enforce('guid' not in content, http.BadRequest, + 'GUID in cannot be changed') doc = self.volume[this.request.resource][this.request.guid] enforce(doc.available, 'Resource not found') this.resource = doc @@ -334,27 +259,16 @@ class Routes(object): 'Property is not aggregated') prop.assert_access(acl) - def enforce_authority(author): - if prop.acl & ACL.AUTHOR: - author = doc['author'] - enforce(not author or this.principal in author or - this.principal and this.principal.admin, - http.Forbidden, 'Authors only') - aggid = request.key if aggid and aggid in doc[request.prop]: aggvalue = doc[request.prop][aggid] - enforce_authority(aggvalue.get('author')) prop.subteardown(aggvalue['value']) else: enforce(acl != ACL.REMOVE, http.NotFound, 'No aggregated item') - enforce_authority(None) aggvalue = {} if acl != ACL.REMOVE: - value = prop.subtypecast( - request.content_stream if request.content is None - else request.content) + value = prop.subtypecast(request.content) if type(value) is tuple: aggid_, value = value enforce(not aggid or aggid == aggid_, http.BadRequest, @@ -373,3 +287,8 @@ class Routes(object): self.volume[request.resource].update(request.guid, doc.posts) return aggid + + +def _get_resource(): + request = this.request + return this.volume[request.resource][request.guid] diff --git a/sugar_network/db/storage.py b/sugar_network/db/storage.py index bbb50db..87d08b3 100644 --- a/sugar_network/db/storage.py +++ b/sugar_network/db/storage.py @@ -132,3 +132,8 @@ class Record(object): # Touch directory to let it possible to crawl it on startup # when index was not previously closed properly os.utime(join(self._root, '..'), (mtime, mtime)) + + def unset(self, prop): + meta_path = join(self._root, prop) + if exists(meta_path): + os.unlink(meta_path) diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 382176c..25ae1bb 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -15,16 +15,14 @@ import os import logging -from copy import deepcopy from os.path import exists, join, abspath from sugar_network import toolkit from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter from sugar_network.db.blobs import Blobs -from sugar_network.toolkit.router import File from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, coroutine, ranges, enforce +from sugar_network.toolkit import http, coroutine, enforce _logger = logging.getLogger('db.volume') @@ -70,6 +68,20 @@ class Volume(dict): return False return True + @property + def has_seqno(self): + for directory in self.values(): + if directory.has_seqno: + return True + return False + + @property + def has_noseqno(self): + for directory in self.values(): + if directory.has_noseqno: + return True + return False + def close(self): """Close operations with the server.""" _logger.info('Closing documents in %r', self._root) @@ -83,74 +95,6 @@ class Volume(dict): for __ in cls.populate(): coroutine.dispatch() - def diff(self, r, exclude=None, files=None, blobs=True, one_way=False): - if exclude: - include = deepcopy(r) - ranges.exclude(include, exclude) - else: - include = r - last_seqno = None - found = False - - try: - for resource, directory in self.items(): - if one_way and directory.resource.one_way: - continue - yield {'resource': resource} - for doc in directory.diff(r): - patch = doc.diff(include) - if patch: - yield {'guid': doc.guid, 'patch': patch} - found = True - last_seqno = max(last_seqno, doc['seqno']) - if blobs: - for blob in self.blobs.diff(include): - seqno = int(blob.meta.pop('x-seqno')) - yield blob - found = True - last_seqno = max(last_seqno, seqno) - for dirpath in files or []: - for blob in self.blobs.diff(include, dirpath): - seqno = int(blob.meta.pop('x-seqno')) - yield blob - found = True - last_seqno = max(last_seqno, seqno) - except StopIteration: - pass - - if found: - commit_r = include if exclude else deepcopy(r) - ranges.exclude(commit_r, last_seqno + 1, None) - ranges.exclude(r, None, last_seqno) - yield {'commit': commit_r} - - def patch(self, records, shift_seqno=True): - directory = None - committed = [] - seqno = None if shift_seqno else False - - for record in records: - if isinstance(record, File): - if seqno is None: - seqno = self.seqno.next() - self.blobs.patch(record, seqno or 0) - continue - resource = record.get('resource') - if resource: - directory = self[resource] - continue - guid = record.get('guid') - if guid is not None: - seqno = directory.patch(guid, record['patch'], seqno) - continue - commit = record.get('commit') - if commit is not None: - ranges.include(committed, commit) - continue - raise http.BadRequest('Malformed patch') - - return seqno, committed - def broadcast(self, event): if not self.mute: if event['event'] == 'commit': diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index 3f6aef1..6a314f5 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -15,22 +15,17 @@ import os import gettext -import logging -import mimetypes from os.path import join import xapian -from sugar_network import toolkit, db +from sugar_network import db from sugar_network.model.routes import FrontRoutes -from sugar_network.toolkit.spec import parse_version, parse_requires -from sugar_network.toolkit.spec import EMPTY_LICENSE -from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit.router import ACL -from sugar_network.toolkit import i18n, http, svg_to_png, enforce +ICON_SIZE = 55 +LOGO_SIZE = 140 + CONTEXT_TYPES = [ 'activity', 'group', 'package', 'book', ] @@ -58,11 +53,6 @@ RESOURCES = ( 'sugar_network.model.user', ) -ICON_SIZE = 55 -LOGO_SIZE = 140 - -_logger = logging.getLogger('model') - class Rating(db.List): @@ -72,229 +62,3 @@ class Rating(db.List): def slotting(self, value): rating = float(value[1]) / value[0] if value[0] else 0 return xapian.sortable_serialise(rating) - - -class Release(object): - - def typecast(self, release): - if this.resource.exists and \ - 'activity' not in this.resource['type'] and \ - 'book' not in this.resource['type']: - return release - if not isinstance(release, dict): - __, release = load_bundle( - this.volume.blobs.post(release, this.request.content_type), - context=this.request.guid) - return release['bundles']['*-*']['blob'], release - - def reprcast(self, release): - return this.volume.blobs.get(release['bundles']['*-*']['blob']) - - def teardown(self, release): - if this.resource.exists and \ - 'activity' not in this.resource['type'] and \ - 'book' not in this.resource['type']: - return - for bundle in release['bundles'].values(): - this.volume.blobs.delete(bundle['blob']) - - def encode(self, value): - return [] - - -def generate_node_stats(volume): - - def calc_rating(**kwargs): - rating = [0, 0] - alldocs, __ = volume['post'].find(**kwargs) - for post in alldocs: - if post['vote']: - rating[0] += 1 - rating[1] += post['vote'] - return rating - - alldocs, __ = volume['context'].find() - for context in alldocs: - rating = calc_rating(type='review', context=context.guid) - volume['context'].update(context.guid, {'rating': rating}) - - alldocs, __ = volume['post'].find(topic='') - for topic in alldocs: - rating = calc_rating(type='feedback', topic=topic.guid) - volume['post'].update(topic.guid, {'rating': rating}) - - -def load_bundle(blob, context=None, initial=False, extra_deps=None): - context_type = None - context_meta = None - release_notes = None - release = {} - version = None - - try: - bundle = Bundle(blob.path, mime_type='application/zip') - except Exception: - context_type = 'book' - if not context: - context = this.request['context'] - version = this.request['version'] - if 'license' in this.request: - release['license'] = this.request['license'] - if isinstance(release['license'], basestring): - release['license'] = [release['license']] - release['stability'] = 'stable' - release['bundles'] = { - '*-*': { - 'blob': blob.digest, - }, - } - else: - context_type = 'activity' - unpack_size = 0 - - with bundle: - changelog = join(bundle.rootdir, 'CHANGELOG') - for arcname in bundle.get_names(): - if changelog and arcname == changelog: - with bundle.extractfile(changelog) as f: - release_notes = f.read() - changelog = None - unpack_size += bundle.getmember(arcname).size - spec = bundle.get_spec() - context_meta = _load_context_metadata(bundle, spec) - - if not context: - context = spec['context'] - else: - enforce(context == spec['context'], - http.BadRequest, 'Wrong context') - if extra_deps: - spec.requires.update(parse_requires(extra_deps)) - - version = spec['version'] - release['stability'] = spec['stability'] - if spec['license'] is not EMPTY_LICENSE: - release['license'] = spec['license'] - release['commands'] = spec.commands - release['requires'] = spec.requires - release['bundles'] = { - '*-*': { - 'blob': blob.digest, - 'unpack_size': unpack_size, - }, - } - blob.meta['content-type'] = 'application/vnd.olpc-sugar' - - enforce(context, http.BadRequest, 'Context is not specified') - enforce(version, http.BadRequest, 'Version is not specified') - release['version'] = parse_version(version) - - doc = this.volume['context'][context] - if initial and not doc.exists: - enforce(context_meta, http.BadRequest, 'No way to initate context') - context_meta['guid'] = context - context_meta['type'] = [context_type] - with this.principal as principal: - principal.admin = True - this.call(method='POST', path=['context'], content=context_meta, - principal=principal) - else: - enforce(doc.available, http.NotFound, 'No context') - enforce(context_type in doc['type'], - http.BadRequest, 'Inappropriate bundle type') - - if 'license' not in release: - releases = doc['releases'].values() - enforce(releases, http.BadRequest, 'License is not specified') - recent = max(releases, key=lambda x: x.get('value', {}).get('release')) - enforce(recent, http.BadRequest, 'License is not specified') - release['license'] = recent['value']['license'] - - _logger.debug('Load %r release: %r', context, release) - - if this.principal in doc['author']: - patch = doc.format_patch(context_meta) - if patch: - this.call(method='PUT', path=['context', context], content=patch, - principal=this.principal) - doc.posts.update(patch) - # TRANS: Release notes title - title = i18n._('%(name)s %(version)s release') - else: - # TRANS: 3rd party release notes title - title = i18n._('%(name)s %(version)s third-party release') - release['announce'] = this.call(method='POST', path=['post'], - content={ - 'context': context, - 'type': 'notification', - 'title': i18n.encode(title, - name=doc['title'], - version=version, - ), - 'message': release_notes or '', - }, - content_type='application/json', principal=this.principal) - - blob.meta['content-disposition'] = 'attachment; filename="%s-%s%s"' % ( - ''.join(i18n.decode(doc['title']).split()), version, - mimetypes.guess_extension(blob.meta.get('content-type')) or '', - ) - this.volume.blobs.update(blob.digest, blob.meta) - - return context, release - - -def _load_context_metadata(bundle, spec): - result = {} - for prop in ('homepage', 'mime_types'): - if spec[prop]: - result[prop] = spec[prop] - result['guid'] = spec['context'] - - try: - from sugar_network.toolkit.sugar import color_svg - - icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon'])) - svg = color_svg(icon_file.read(), result['guid']) - blobs = this.volume.blobs - - result['artefact_icon'] = \ - blobs.post(svg, 'image/svg+xml').digest - result['icon'] = \ - blobs.post(svg_to_png(svg, ICON_SIZE), 'image/png').digest - result['logo'] = \ - blobs.post(svg_to_png(svg, LOGO_SIZE), 'image/png').digest - - icon_file.close() - except Exception: - _logger.exception('Failed to load icon') - - msgids = {} - for prop, confname in [ - ('title', 'name'), - ('summary', 'summary'), - ('description', 'description'), - ]: - if spec[confname]: - msgids[prop] = spec[confname] - result[prop] = {'en': spec[confname]} - with toolkit.mkdtemp() as tmpdir: - for path in bundle.get_names(): - if not path.endswith('.mo'): - continue - mo_path = path.strip(os.sep).split(os.sep) - if len(mo_path) != 5 or mo_path[1] != 'locale': - continue - lang = mo_path[2] - bundle.extract(path, tmpdir) - try: - translation = gettext.translation(spec['context'], - join(tmpdir, *mo_path[:2]), [lang]) - for prop, value in msgids.items(): - msgstr = translation.gettext(value).decode('utf8') - if lang == 'en' or msgstr != value: - result[prop][lang] = msgstr - except Exception: - _logger.exception('Gettext failed to read %r', mo_path[-1]) - - return result diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py index 9153552..cf24650 100644 --- a/sugar_network/model/context.py +++ b/sugar_network/model/context.py @@ -95,17 +95,17 @@ class Context(db.Resource): def previews(self, value): return value - @db.stored_property(db.Aggregated, subtype=model.Release(), - acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE) + @db.stored_property(db.Aggregated, subtype=db.Dict(), + acl=ACL.READ | ACL.LOCAL) def releases(self, value): return value @db.indexed_property(db.Numeric, slot=2, default=0, - acl=ACL.READ | ACL.CALC) + acl=ACL.READ | ACL.LOCAL) def downloads(self, value): return value - @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.CALC) + @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.LOCAL) def rating(self, value): return value diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py index d924617..e0b3b25 100644 --- a/sugar_network/model/post.py +++ b/sugar_network/model/post.py @@ -20,11 +20,12 @@ from sugar_network.toolkit.coroutine import this class Post(db.Resource): - @db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ) + @db.indexed_property(db.Reference, prefix='C', acl=ACL.CREATE | ACL.READ) def context(self, value): return value - @db.indexed_property(prefix='A', default='', acl=ACL.CREATE | ACL.READ) + @db.indexed_property(db.Reference, prefix='A', default='', + acl=ACL.CREATE | ACL.READ) def topic(self, value): return value @@ -42,7 +43,7 @@ class Post(db.Resource): def message(self, value): return value - @db.indexed_property(prefix='R', default='') + @db.indexed_property(db.Reference, prefix='R', default='') def solution(self, value): return value @@ -82,10 +83,10 @@ class Post(db.Resource): return value @db.indexed_property(db.Numeric, slot=2, default=0, - acl=ACL.READ | ACL.CALC) + acl=ACL.READ | ACL.LOCAL) def downloads(self, value): return value - @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.CALC) + @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.LOCAL) def rating(self, value): return value diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py index a434a6d..4f201d5 100644 --- a/sugar_network/model/report.py +++ b/sugar_network/model/report.py @@ -34,7 +34,7 @@ class Report(db.Resource): one_way = True - @db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ) + @db.indexed_property(db.Reference, prefix='C', acl=ACL.CREATE | ACL.READ) def context(self, value): return value diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index 8012853..63c98b1 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -58,7 +58,7 @@ class FrontRoutes(object): # initiate a subscription and do not stuck in waiting for the 1st event yield {'event': 'pong'} - subscription = this.request.content_stream + subscription = this.request.content if subscription is not None: coroutine.spawn(self._wait_for_closing, subscription) diff --git a/sugar_network/node/auth.py b/sugar_network/node/auth.py index 00054f5..d14bde6 100644 --- a/sugar_network/node/auth.py +++ b/sugar_network/node/auth.py @@ -40,19 +40,46 @@ class Unauthorized(http.Unauthorized): class Principal(str): - admin = False - editor = False - translator = False + def __new__(cls, value, caps=0): + if not isinstance(value, basestring): + value, caps = value + self = str.__new__(cls, value) + # pylint: disable-msg=W0212 + self._caps = caps + self._backup = 0 + return self + + @property + def cap_author_override(self): + return self._caps & 1 + + @cap_author_override.setter + def cap_author_override(self, value): + if value: + self._caps |= 1 + else: + self._caps ^= 1 - _backup = None + @property + def cap_create_with_guid(self): + return self._caps & 1 + + @cap_create_with_guid.setter + def cap_create_with_guid(self, value): + if value: + self._caps |= 1 + else: + self._caps ^= 1 def __enter__(self): - self._backup = (self.admin, self.editor, self.translator) + self._backup = self._caps return self def __exit__(self, exc_type, exc_value, traceback): - self.admin, self.editor, self.translator = self._backup - self._backup = None + self._caps = self._backup + + def dump(self): + return self, self._caps class SugarAuth(object): @@ -109,10 +136,8 @@ class SugarAuth(object): for role in self._config.get('permissions', user).split(): role = role.lower() if role == 'admin': - principal.admin = True - elif role == 'editor': - principal.editor = True - elif role == 'translator': - principal.translator = True + principal.cap_author_override = True + principal.cap_create_with_guid = True + # TODO return principal diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index c5b15e6..c94d047 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -19,15 +19,14 @@ from urlparse import urlsplit from sugar_network import toolkit from sugar_network.model.post import Post from sugar_network.model.report import Report -from sugar_network.node.model import User, Context -from sugar_network.node import obs, master_api +from sugar_network.node import obs, master_api, model from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, parcel, pylru, ranges, enforce +from sugar_network.toolkit import http, packets, pylru, ranges, enforce -RESOURCES = (User, Context, Post, Report) +RESOURCES = (model.User, model.Context, Post, Report) _logger = logging.getLogger('node.master') @@ -40,20 +39,20 @@ class MasterRoutes(NodeRoutes): @route('POST', cmd='sync', arguments={'accept_length': int}) def sync(self, accept_length): - return parcel.encode(self._push() + (self._pull() or []), + return packets.encode(self._push() + (self._pull() or []), limit=accept_length, header={'from': self.guid}, on_complete=this.cookie.clear) @route('POST', cmd='push') def push(self): - return parcel.encode(self._push(), header={'from': self.guid}) + return packets.encode(self._push(), header={'from': self.guid}) @route('GET', cmd='pull', arguments={'accept_length': int}) def pull(self, accept_length): reply = self._pull() if reply is None: return None - return parcel.encode(reply, limit=accept_length, + return packets.encode(reply, limit=accept_length, header={'from': self.guid}, on_complete=this.cookie.clear) @route('PUT', ['context', None], cmd='presolve', @@ -72,13 +71,13 @@ class MasterRoutes(NodeRoutes): cookie = this.cookie reply = [] - for packet in parcel.decode( - this.request.content_stream, this.request.content_length): + for packet in packets.decode( + this.request.content, this.request.content_length): sender = packet['from'] enforce(packet['to'] == self.guid, http.BadRequest, 'Misaddressed packet') if packet.name == 'push': - seqno, push_r = this.volume.patch(packet) + seqno, push_r = model.patch_volume(packet) ack_r = [] if seqno is None else [[seqno, seqno]] ack = {'ack': ack_r, 'ranges': push_r, 'to': sender} reply.append(('ack', ack, None)) @@ -129,7 +128,7 @@ class MasterRoutes(NodeRoutes): r = reduce(lambda x, y: ranges.intersect(x, y), acked.values()) ranges.include(exclude, r) - push = this.volume.diff(pull_r, exclude, one_way=True, files=['']) + push = model.diff_volume(pull_r, exclude, one_way=True, files=['']) reply.append(('push', None, push)) return reply diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 144dab0..f178913 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -13,19 +13,32 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import json import bisect import hashlib import logging +import gettext +import mimetypes +from copy import deepcopy from os.path import join from sugar_network import db, toolkit -from sugar_network.model import Release, context as _context, user as _user +from sugar_network.model import context as _context, user as _user +from sugar_network.model import ICON_SIZE, LOGO_SIZE from sugar_network.node import obs -from sugar_network.toolkit.router import ACL -from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import spec, sat, http, coroutine, i18n, enforce +from sugar_network.node.auth import Principal +from sugar_network.toolkit.router import ACL, File, Request, Response +from sugar_network.toolkit.coroutine import Queue, this +from sugar_network.toolkit.spec import EMPTY_LICENSE, ensure_version +from sugar_network.toolkit.spec import parse_requires, parse_version +from sugar_network.toolkit.bundle import Bundle +from sugar_network.toolkit import sat, http, i18n, ranges, packets +from sugar_network.toolkit import svg_to_png, enforce +BATCH_SUFFIX = '.meta' + _logger = logging.getLogger('node.model') _presolve_queue = None @@ -36,73 +49,42 @@ class User(_user.User): self.posts['guid'] = str(hashlib.sha1(self['pubkey']).hexdigest()) -class _Release(Release): +class _ReleaseValue(dict): - _package_cast = db.Dict(db.List()) + guid = None - def typecast(self, value): - if not this.resource.exists or 'package' not in this.resource['type']: - return Release.typecast(self, value) - - value = self._package_cast.typecast(value) - enforce(value.get('binary'), http.BadRequest, 'No binary aliases') - - distro = this.request.key - if distro == '*': - lsb_id = None - lsb_release = None - elif '-' in this.request.key: - lsb_id, lsb_release = distro.split('-', 1) - else: - lsb_id = distro - lsb_release = None - releases = this.resource.record.get('releases') - resolves = releases['value'].setdefault('resolves', {}) - to_presolve = [] - - for repo in obs.get_repos(): - if lsb_id and lsb_id != repo['lsb_id'] or \ - lsb_release and lsb_release != repo['lsb_release']: - continue - # Make sure there are no alias overrides - if not lsb_id and repo['lsb_id'] in releases['value'] or \ - not lsb_release and repo['name'] in releases['value']: - continue - pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], []) - version = None - try: - for arch in repo['arches']: - version = obs.resolve(repo['name'], arch, pkgs)['version'] - except Exception, error: - _logger.warning('Failed to resolve %r on %s', - pkgs, repo['name']) - resolve = {'status': str(error)} - else: - to_presolve.append((repo['name'], pkgs)) - resolve = { - 'version': spec.parse_version(version), - 'packages': pkgs, - 'status': 'success', - } - resolves.setdefault(repo['name'], {}).update(resolve) - if to_presolve and _presolve_queue is not None: - _presolve_queue.put(to_presolve) - if resolves: - this.resource.record.set('releases', **releases) +class _Release(object): - return value + _package_subcast = db.Dict(db.List()) + + def typecast(self, value): + if isinstance(value, _ReleaseValue): + return value.guid, value + doc = this.volume['context'][this.request.guid] + if 'package' in doc['type']: + value = _ReleaseValue(self._package_subcast.typecast(value)) + value.guid = this.request.key + _resolve_package_alias(doc, value) + return value + bundle = this.volume.blobs.post(value, this.request.content_type) + __, value = load_bundle(bundle, context=this.request.guid) + return value.guid, value + + def encode(self, value): + return [] def teardown(self, value): - if 'package' not in this.resource['type']: - return Release.teardown(self, value) + if 'bundles' in value: + for bundle in value['bundles'].values(): + this.volume.blobs.delete(bundle['blob']) # TODO Delete presolved files class Context(_context.Context): @db.stored_property(db.Aggregated, subtype=_Release(), - acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE) + acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE | ACL.LOCAL) def releases(self, value): return value @@ -135,6 +117,168 @@ class Volume(db.Volume): self.release_seqno.commit() +def diff_volume(r, exclude=None, files=None, blobs=True, one_way=False): + volume = this.volume + if exclude: + include = deepcopy(r) + ranges.exclude(include, exclude) + else: + include = r + last_seqno = None + found = False + + try: + for resource, directory in volume.items(): + if one_way and directory.resource.one_way: + continue + yield {'resource': resource} + for doc in directory.diff(r): + patch = doc.diff(include) + if patch: + yield {'guid': doc.guid, 'patch': patch} + found = True + last_seqno = max(last_seqno, doc['seqno']) + if blobs: + for blob in volume.blobs.diff(include): + seqno = int(blob.meta.pop('x-seqno')) + yield blob + found = True + last_seqno = max(last_seqno, seqno) + for dirpath in files or []: + for blob in volume.blobs.diff(include, dirpath): + seqno = int(blob.meta.pop('x-seqno')) + yield blob + found = True + last_seqno = max(last_seqno, seqno) + except StopIteration: + pass + + if found: + commit_r = include if exclude else deepcopy(r) + ranges.exclude(commit_r, last_seqno + 1, None) + ranges.exclude(r, None, last_seqno) + yield {'commit': commit_r} + + +def patch_volume(records, shift_seqno=True): + volume = this.volume + directory = None + committed = [] + seqno = None if shift_seqno else False + + for record in records: + if isinstance(record, File): + if seqno is None: + seqno = volume.seqno.next() + volume.blobs.patch(record, seqno or 0) + continue + resource = record.get('resource') + if resource: + directory = volume[resource] + continue + guid = record.get('guid') + if guid is not None: + seqno = directory.patch(guid, record['patch'], seqno) + continue + commit = record.get('commit') + if commit is not None: + ranges.include(committed, commit) + continue + raise http.BadRequest('Malformed patch') + + return seqno, committed + + +def diff_resource(in_r): + request = this.request + enforce(request.resource != 'user', http.BadRequest, + 'Not allowed for User resource') + doc = this.volume[request.resource][request.guid] + enforce(doc.exists, http.NotFound, 'Resource not found') + + out_r = [] + if in_r is None: + in_r = [[1, None]] + patch = doc.diff(in_r, out_r) + if not patch: + return packets.encode([], compresslevel=0) + blobs = [] + + def add_blob(blob): + if not isinstance(blob, File): + return + seqno = int(blob.meta['x-seqno']) + ranges.include(out_r, seqno, seqno) + blobs.append(blob) + + for prop, meta in patch.items(): + prop = doc.metadata[prop] + value = prop.reprcast(meta['value']) + if isinstance(prop, db.Aggregated): + for __, aggvalue in value: + add_blob(aggvalue) + else: + add_blob(value) + + return packets.encode(blobs, patch=patch, ranges=out_r, compresslevel=0) + + +def apply_batch(path): + with file(path + BATCH_SUFFIX) as f: + meta = json.load(f) + principal = Principal(meta['principal']) + principal.cap_create_with_guid = True + only_nums = meta.get('failed') + guid_map = meta.setdefault('guid_map', {}) + failed = meta['failed'] = [] + volume = this.volume + + def map_guid(remote_guid): + local_guid = guid_map.get(remote_guid) + if not local_guid: + if volume[request.resource][remote_guid].exists: + return remote_guid + local_guid = guid_map[remote_guid] = toolkit.uuid() + return local_guid + + with file(path, 'rb') as batch: + num = 0 + for record in packets.decode(batch): + num += 1 + if only_nums and not ranges.contains(only_nums, num): + continue + if isinstance(record, File): + request = Request(**record.meta.pop('op')) + request.content = record + else: + request = Request(**record['op']) + props = record['content'] + keys = record.get('keys') or [] + enforce('guid' not in props or 'guid' in keys, + http.BadRequest, 'Guid values is not mapped') + for key in keys: + enforce(key in props, http.BadRequest, + 'No mapped property value') + props[key] = map_guid(props[key]) + request.content = props + if request.guid and \ + not volume[request.resource][request.guid].exists: + request.guid = map_guid(request.guid) + request.principal = principal + try: + this.call(request, Response()) + except Exception: + _logger.exception('Failed to apply %r', request) + ranges.include(failed, num, num) + + if failed: + with toolkit.new_file(path + BATCH_SUFFIX) as f: + json.dump(meta, f) + else: + os.unlink(path + BATCH_SUFFIX) + os.unlink(path) + + def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, stability=None, requires=None): top_context = volume['context'][top_context] @@ -145,12 +289,12 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, top_cond = [] top_requires = {} if isinstance(requires, basestring): - top_requires.update(spec.parse_requires(requires)) + top_requires.update(parse_requires(requires)) elif requires: for i in requires: - top_requires.update(spec.parse_requires(i)) + top_requires.update(parse_requires(i)) if top_context['dependencies']: - top_requires.update(spec.parse_requires(top_context['dependencies'])) + top_requires.update(parse_requires(top_context['dependencies'])) if top_context.guid in top_requires: top_cond = top_requires.pop(top_context.guid) @@ -173,7 +317,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, for dep, cond in deps.items(): dep_clause = [-v_usage] for v_release in add_context(dep): - if spec.ensure(varset[v_release][1]['version'], cond): + if ensure_version(varset[v_release][1]['version'], cond): dep_clause.append(v_release) clauses.append(dep_clause) @@ -211,7 +355,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, release = release['value'] if release['stability'] not in stability or \ context.guid == top_context.guid and \ - not spec.ensure(release['version'], top_cond): + not ensure_version(release['version'], top_cond): continue bisect.insort(candidates, rate_release(digest, release)) for release in reversed(candidates): @@ -272,12 +416,259 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, def presolve(presolve_path): global _presolve_queue - _presolve_queue = coroutine.Queue() + _presolve_queue = Queue() for repo_name, pkgs in _presolve_queue: obs.presolve(repo_name, pkgs, presolve_path) +def load_bundle(blob, context=None, initial=False, extra_deps=None): + context_type = None + context_meta = None + release_notes = None + version = None + release = _ReleaseValue() + release.guid = blob.digest + + try: + bundle = Bundle(blob.path, mime_type='application/zip') + except Exception: + context_type = 'book' + if not context: + context = this.request['context'] + version = this.request['version'] + if 'license' in this.request: + release['license'] = this.request['license'] + if isinstance(release['license'], basestring): + release['license'] = [release['license']] + release['stability'] = 'stable' + release['bundles'] = { + '*-*': { + 'blob': blob.digest, + }, + } + else: + context_type = 'activity' + unpack_size = 0 + + with bundle: + changelog = join(bundle.rootdir, 'CHANGELOG') + for arcname in bundle.get_names(): + if changelog and arcname == changelog: + with bundle.extractfile(changelog) as f: + release_notes = f.read() + changelog = None + unpack_size += bundle.getmember(arcname).size + spec = bundle.get_spec() + context_meta = _load_context_metadata(bundle, spec) + + if not context: + context = spec['context'] + else: + enforce(context == spec['context'], + http.BadRequest, 'Wrong context') + if extra_deps: + spec.requires.update(parse_requires(extra_deps)) + + version = spec['version'] + release['stability'] = spec['stability'] + if spec['license'] is not EMPTY_LICENSE: + release['license'] = spec['license'] + release['commands'] = spec.commands + release['requires'] = spec.requires + release['bundles'] = { + '*-*': { + 'blob': blob.digest, + 'unpack_size': unpack_size, + }, + } + blob.meta['content-type'] = 'application/vnd.olpc-sugar' + + enforce(context, http.BadRequest, 'Context is not specified') + enforce(version, http.BadRequest, 'Version is not specified') + release['version'] = parse_version(version) + + doc = this.volume['context'][context] + if initial and not doc.exists: + enforce(context_meta, http.BadRequest, 'No way to initate context') + context_meta['guid'] = context + context_meta['type'] = [context_type] + with this.principal as principal: + principal.cap_create_with_guid = True + this.call(method='POST', path=['context'], content=context_meta, + principal=principal) + else: + enforce(doc.available, http.NotFound, 'No context') + enforce(context_type in doc['type'], + http.BadRequest, 'Inappropriate bundle type') + + if 'license' not in release: + releases = doc['releases'].values() + enforce(releases, http.BadRequest, 'License is not specified') + recent = max(releases, key=lambda x: x.get('value', {}).get('release')) + enforce(recent, http.BadRequest, 'License is not specified') + release['license'] = recent['value']['license'] + + _logger.debug('Load %r release: %r', context, release) + + if this.principal in doc['author']: + patch = doc.format_patch(context_meta) + if patch: + this.call(method='PUT', path=['context', context], content=patch, + principal=this.principal) + doc.posts.update(patch) + # TRANS: Release notes title + title = i18n._('%(name)s %(version)s release') + else: + # TRANS: 3rd party release notes title + title = i18n._('%(name)s %(version)s third-party release') + release['announce'] = this.call(method='POST', path=['post'], + content={ + 'context': context, + 'type': 'notification', + 'title': i18n.encode(title, + name=doc['title'], + version=version, + ), + 'message': release_notes or '', + }, + content_type='application/json', principal=this.principal) + + blob.meta['content-disposition'] = 'attachment; filename="%s-%s%s"' % ( + ''.join(i18n.decode(doc['title']).split()), version, + mimetypes.guess_extension(blob.meta.get('content-type')) or '', + ) + this.volume.blobs.update(blob.digest, blob.meta) + + return context, release + + +def generate_node_stats(volume): + + def calc_rating(**kwargs): + rating = [0, 0] + alldocs, __ = volume['post'].find(**kwargs) + for post in alldocs: + if post['vote']: + rating[0] += 1 + rating[1] += post['vote'] + return rating + + alldocs, __ = volume['context'].find() + for context in alldocs: + rating = calc_rating(type='review', context=context.guid) + volume['context'].update(context.guid, {'rating': rating}) + + alldocs, __ = volume['post'].find(topic='') + for topic in alldocs: + rating = calc_rating(type='feedback', topic=topic.guid) + volume['post'].update(topic.guid, {'rating': rating}) + + +def _load_context_metadata(bundle, spec): + result = {} + for prop in ('homepage', 'mime_types'): + if spec[prop]: + result[prop] = spec[prop] + result['guid'] = spec['context'] + + try: + from sugar_network.toolkit.sugar import color_svg + + icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon'])) + svg = color_svg(icon_file.read(), result['guid']) + blobs = this.volume.blobs + + result['artefact_icon'] = \ + blobs.post(svg, 'image/svg+xml').digest + result['icon'] = \ + blobs.post(svg_to_png(svg, ICON_SIZE), 'image/png').digest + result['logo'] = \ + blobs.post(svg_to_png(svg, LOGO_SIZE), 'image/png').digest + + icon_file.close() + except Exception: + _logger.exception('Failed to load icon') + + msgids = {} + for prop, confname in [ + ('title', 'name'), + ('summary', 'summary'), + ('description', 'description'), + ]: + if spec[confname]: + msgids[prop] = spec[confname] + result[prop] = {'en': spec[confname]} + with toolkit.mkdtemp() as tmpdir: + for path in bundle.get_names(): + if not path.endswith('.mo'): + continue + mo_path = path.strip(os.sep).split(os.sep) + if len(mo_path) != 5 or mo_path[1] != 'locale': + continue + lang = mo_path[2] + bundle.extract(path, tmpdir) + try: + translation = gettext.translation(spec['context'], + join(tmpdir, *mo_path[:2]), [lang]) + for prop, value in msgids.items(): + msgstr = translation.gettext(value).decode('utf8') + if lang == 'en' or msgstr != value: + result[prop][lang] = msgstr + except Exception: + _logger.exception('Gettext failed to read %r', mo_path[-1]) + + return result + + +def _resolve_package_alias(doc, value): + enforce(value.get('binary'), http.BadRequest, 'No binary aliases') + + distro = this.request.key + enforce(distro, http.BadRequest, 'No distro in path') + if distro == '*': + lsb_id = None + lsb_release = None + elif '-' in this.request.key: + lsb_id, lsb_release = distro.split('-', 1) + else: + lsb_id = distro + lsb_release = None + releases = doc['releases'] + resolves = releases.get('resolves') or {} + to_presolve = [] + + for repo in obs.get_repos(): + if lsb_id and lsb_id != repo['lsb_id'] or \ + lsb_release and lsb_release != repo['lsb_release']: + continue + # Make sure there are no alias overrides + if not lsb_id and repo['lsb_id'] in releases or \ + not lsb_release and repo['name'] in releases: + continue + pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], []) + version = None + try: + for arch in repo['arches']: + version = obs.resolve(repo['name'], arch, pkgs)['version'] + except Exception, error: + _logger.warning('Failed to resolve %r on %s', + pkgs, repo['name']) + resolve = {'status': str(error)} + else: + to_presolve.append((repo['name'], pkgs)) + resolve = { + 'version': parse_version(version), + 'packages': pkgs, + 'status': 'success', + } + resolves.setdefault(repo['name'], {}).update(resolve) + + if to_presolve and _presolve_queue is not None: + _presolve_queue.put(to_presolve) + doc.post('releases', {'resolves': resolves}) + + _STABILITY_RATES = { 'insecure': 0, 'buggy': 1, diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index ac8a840..ee28e89 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -15,20 +15,28 @@ # pylint: disable-msg=W0611 +import os +import re +import json +import time +import shutil import logging -from os.path import join +from os.path import join, exists -from sugar_network import db -from sugar_network.model import FrontRoutes, load_bundle +from sugar_network import db, toolkit +from sugar_network.model import FrontRoutes from sugar_network.node import model -from sugar_network.toolkit.router import ACL, File -from sugar_network.toolkit.router import route, fallbackroute, preroute +from sugar_network.toolkit.router import ACL, File, Request, Response, route +from sugar_network.toolkit.router import fallbackroute, preroute, postroute from sugar_network.toolkit.spec import parse_requires, parse_version from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, coroutine, enforce +from sugar_network.toolkit import http, coroutine, ranges, packets, enforce +_GROUPED_DIFF_LIMIT = 1024 +_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$') + _logger = logging.getLogger('node.routes') @@ -39,6 +47,10 @@ class NodeRoutes(db.Routes, FrontRoutes): FrontRoutes.__init__(self) self._guid = guid self._auth = auth + self._batch_dir = join(self.volume.root, 'batch') + + if not exists(self._batch_dir): + os.makedirs(self._batch_dir) @property def guid(self): @@ -47,35 +59,49 @@ class NodeRoutes(db.Routes, FrontRoutes): @preroute def preroute(self, op): request = this.request + if request.principal: this.principal = request.principal elif op.acl & ACL.AUTH: this.principal = self._auth.logon(request) else: this.principal = None - if op.acl & ACL.AUTHOR and request.guid: - if not this.principal: - this.principal = self._auth.logon(request) - allowed = this.principal.admin - if not allowed: - if request.resource == 'user': - allowed = (this.principal == request.guid) - else: - doc = self.volume[request.resource].get(request.guid) - allowed = this.principal in doc['author'] + + if op.acl & ACL.AUTHOR and not this.principal.cap_author_override: + if request.resource == 'user': + allowed = (this.principal == request.guid) + else: + allowed = this.principal in this.resource['author'] enforce(allowed, http.Forbidden, 'Authors only') - if op.acl & ACL.SUPERUSER: - if not this.principal: - this.principal = self._auth.logon(request) - enforce(this.principal.admin, http.Forbidden, 'Superusers only') + + if op.acl & ACL.AGG_AUTHOR and not this.principal.cap_author_override: + if this.resource.metadata[request.prop].acl & ACL.AUTHOR: + allowed = this.principal in this.resource['author'] + elif request.key: + value = this.resource[request.prop].get(request.key) + allowed = value is None or this.principal in value['author'] + else: + allowed = True + enforce(allowed, http.Forbidden, 'Authors only') + + @postroute + def postroute(self, result, exception): + request = this.request + if not request.guid: + return result + pull = request.headers['pull'] + if pull is None: + return result + this.response.content_type = 'application/octet-stream' + return model.diff_resource(pull) + + @route('GET', cmd='logon', acl=ACL.AUTH) + def logon(self): + pass @route('GET', cmd='whoami', mime_type='application/json') def whoami(self): - roles = [] - if this.principal and this.principal.admin: - roles.append('root') - return {'roles': roles, - 'guid': this.principal, + return {'guid': this.principal, 'route': 'direct', } @@ -123,9 +149,9 @@ class NodeRoutes(db.Routes, FrontRoutes): mime_type='application/json', acl=ACL.AUTH) def submit_release(self, initial): blob = self.volume.blobs.post( - this.request.content_stream, this.request.content_type) + this.request.content, this.request.content_type) try: - context, release = load_bundle(blob, initial=initial) + context, release = model.load_bundle(blob, initial=initial) except Exception: self.volume.blobs.delete(blob.digest) raise @@ -147,5 +173,54 @@ class NodeRoutes(db.Routes, FrontRoutes): solution = self.solve() return self.volume.blobs.get(solution[this.request.guid]['blob']) + @route('GET', [None, None], cmd='diff') + def diff_resource(self): + return model.diff_resource(this.request.headers['ranges']) + + @route('GET', [None], cmd='diff', mime_type='application/json') + def grouped_diff(self, key): + request = this.request + enforce(request.resource != 'user', http.BadRequest, + 'Not allowed for User resource') + + if not key: + key = 'guid' + in_r = request.headers['ranges'] or [[1, None]] + diff = {} + + for doc in self.volume[request.resource].diff(in_r): + out_r = diff.get(doc[key]) + if out_r is None: + if len(diff) >= _GROUPED_DIFF_LIMIT: + break + out_r = diff[doc[key]] = [] + ranges.include(out_r, doc['seqno'], doc['seqno']) + doc.diff(in_r, out_r) + + return diff + + @route('POST', cmd='apply', acl=ACL.AUTH) + def batched_post(self): + with toolkit.NamedTemporaryFile(dir=self._batch_dir, + prefix=this.principal, delete=False) as batch: + try: + shutil.copyfileobj(this.request.content, batch) + except Exception: + os.unlink(batch.name) + raise + with file(batch.name + '.meta', 'w') as f: + json.dump({'principal': this.principal.dump()}, f) + coroutine.spawn(model.apply_batch, batch.name) + + def create(self): + if this.principal and this.principal.cap_create_with_guid: + guid = this.request.content.get('guid') + enforce(not guid or _GUID_RE.match(guid), http.BadRequest, + 'Malformed GUID') + else: + enforce('guid' not in this.request.content, http.BadRequest, + 'GUID should not be specified') + return db.Routes.create(self) + this.principal = None diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 074ae79..176defd 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -25,15 +25,14 @@ from sugar_network import toolkit from sugar_network.model.context import Context from sugar_network.model.post import Post from sugar_network.model.report import Report -from sugar_network.node.model import User -from sugar_network.node import master_api +from sugar_network.node import master_api, model from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, parcel, ranges, enforce +from sugar_network.toolkit import http, packets, ranges, enforce -RESOURCES = (User, Context, Post, Report) +RESOURCES = (model.User, Context, Post, Report) _logger = logging.getLogger('node.slave') @@ -62,13 +61,13 @@ class SlaveRoutes(NodeRoutes): def online_sync(self, no_pull=False): conn = http.Connection(master_api.value) response = conn.request('POST', - data=parcel.encode(self._export(not no_pull), header={ + data=packets.encode(self._export(not no_pull), header={ 'from': self.guid, 'to': self._master_guid, }), params={'cmd': 'sync'}, headers={'Transfer-Encoding': 'chunked'}) - self._import(parcel.decode(response.raw)) + self._import(packets.decode(response.raw)) @route('POST', cmd='offline_sync', acl=ACL.LOCAL) def offline_sync(self, path): @@ -82,7 +81,7 @@ class SlaveRoutes(NodeRoutes): 'event': 'sync_progress', 'progress': _('Reading sneakernet packages'), }) - requests = self._import(parcel.decode_dir(path)) + requests = self._import(packets.decode_dir(path)) this.broadcast({ 'event': 'sync_progress', @@ -91,7 +90,7 @@ class SlaveRoutes(NodeRoutes): offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') if exists(offline_script): shutil.copy(offline_script, path) - parcel.encode_dir(requests + self._export(True), root=path, header={ + packets.encode_dir(requests + self._export(True), root=path, header={ 'from': self.guid, 'to': self._master_guid, }) @@ -110,7 +109,7 @@ class SlaveRoutes(NodeRoutes): sender = packet['from'] from_master = (sender == self._master_guid) if packet.name == 'push': - seqno, committed = this.volume.patch(packet) + seqno, committed = model.patch_volume(packet) if seqno is not None: if from_master: with self._pull_r as r: @@ -136,5 +135,5 @@ class SlaveRoutes(NodeRoutes): export = [] if pull: export.append(('pull', {'ranges': self._pull_r.value}, None)) - export.append(('push', None, self.volume.diff(self._push_r.value))) + export.append(('push', None, model.diff_volume(self._push_r.value))) return export diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 7585e29..bf80271 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -506,7 +506,7 @@ class Bin(object): @property def mtime(self): if exists(self._path): - return os.stat(self._path).st_mtime + return int(os.stat(self._path).st_mtime) else: return 0 @@ -650,7 +650,7 @@ class _NewFile(object): dst_path = None def __init__(self, **kwargs): - self._file = tempfile.NamedTemporaryFile(delete=False, **kwargs) + self._file = NamedTemporaryFile(delete=False, **kwargs) @property def name(self): @@ -666,6 +666,8 @@ class _NewFile(object): def close(self): self._file.close() if exists(self.name): + if not exists(dirname(self.dst_path)): + os.makedirs(dirname(self.dst_path)) os.rename(self.name, self.dst_path) def __enter__(self): diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py index 4a54975..e3a6173 100644 --- a/sugar_network/toolkit/coroutine.py +++ b/sugar_network/toolkit/coroutine.py @@ -303,25 +303,45 @@ class Spooler(object): class _Local(object): + PROPERTY_NOT_SET = object() + def __init__(self): self.attrs = set() + self.properties = {} if hasattr(gevent.getcurrent(), 'local'): current = gevent.getcurrent().local for attr in current.attrs: self.attrs.add(attr) setattr(self, attr, getattr(current, attr)) + self.properties = current.properties class _LocalAccess(object): def __getattr__(self, name): - return getattr(gevent.getcurrent().local, name) + local = gevent.getcurrent().local + value = getattr(local, name) + if value is _Local.PROPERTY_NOT_SET: + value = local.properties[name]() + setattr(local, name, value) + return value def __setattr__(self, name, value): local = gevent.getcurrent().local local.attrs.add(name) - return setattr(local, name, value) + if value is None and name in local.properties: + value = _Local.PROPERTY_NOT_SET + setattr(local, name, value) + + def add_property(self, name, getter): + local = gevent.getcurrent().local + local.properties[name] = getter + setattr(local, name, _Local.PROPERTY_NOT_SET) + + def reset_property(self, name): + local = gevent.getcurrent().local + setattr(local, name, _Local.PROPERTY_NOT_SET) class _Child(object): diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 0cbd535..4096b7c 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,7 +15,6 @@ import sys import json -import types import logging from os.path import join, dirname @@ -110,12 +109,14 @@ class Connection(object): _Session = None - def __init__(self, url='', creds=None, max_retries=0, **session_args): + def __init__(self, url='', creds=None, max_retries=0, auth_request=None, + **session_args): self.url = url self.creds = creds self._max_retries = max_retries self._session_args = session_args self._session = None + self._auth_request = auth_request def __repr__(self): return '<Connection url=%s>' % self.url @@ -146,13 +147,17 @@ class Connection(object): return self._decode_reply(reply) def post(self, path_=None, data_=None, query_=None, **kwargs): - reply = self.request('POST', path_, json.dumps(data_), + if data_ is not None: + data_ = json.dumps(data_) + reply = self.request('POST', path_, data_, headers={'Content-Type': 'application/json'}, params=query_ or kwargs) return self._decode_reply(reply) def put(self, path_=None, data_=None, query_=None, **kwargs): - reply = self.request('PUT', path_, json.dumps(data_), + if data_ is not None: + data_ = json.dumps(data_) + reply = self.request('PUT', path_, data_, headers={'Content-Type': 'application/json'}, params=query_ or kwargs) return self._decode_reply(reply) @@ -182,8 +187,8 @@ class Connection(object): f.close() return reply - def upload(self, path_=None, data_=None, **kwargs): - reply = self.request('POST', path_, data_, params=kwargs) + def upload(self, path_=None, data=None, **kwargs): + reply = self.request('POST', path_, data, params=kwargs) if reply.headers.get('Content-Type') == 'application/json': return json.loads(reply.content) else: @@ -191,6 +196,11 @@ class Connection(object): def request(self, method, path=None, data=None, headers=None, allowed=None, params=None, **kwargs): + if data is not None and self._auth_request: + auth_request = self._auth_request + self._auth_request = None + self.request(**auth_request) + if self._session is None: self._init() @@ -209,6 +219,9 @@ class Connection(object): reply = self._session.request(method, path, data=data, headers=headers, params=params, **kwargs) if reply.status_code == Unauthorized.status_code: + enforce(data is None, + 'Authorization is requited ' + 'but no way to resend posting data') enforce(self.creds is not None, Unauthorized, 'No credentials') challenge_ = reply.headers.get('www-authenticate') if challenge and challenge == challenge_: @@ -218,6 +231,7 @@ class Connection(object): self.post(['user'], profile) challenge = challenge_ self._session.headers.update(self.creds.logon(challenge)) + self._auth_request = None try_ = 0 elif reply.status_code == 200 or \ allowed and reply.status_code in allowed: @@ -228,12 +242,12 @@ class Connection(object): error = json.loads(content)['error'] except Exception: # On non-JSONified fail response, assume that the error - # was not sent by the application level server code, i.e., + # was not sent by the application-level server code, i.e., # something happaned on low level, like connection abort. # If so, try to resend request. - if try_ <= self._max_retries and method in ('GET', 'HEAD'): + if try_ <= self._max_retries and data is None: continue - error = content or reply.headers.get('x-sn-error') or \ + error = content or reply.headers.get('x-error') or \ 'No error message provided' cls = _FORWARD_STATUSES.get(reply.status_code, RuntimeError) \ or ConnectionError @@ -242,24 +256,11 @@ class Connection(object): return reply def call(self, request, response=None): - if request.content_type == 'application/json': - request.content = json.dumps(request.content) - - headers = {} - if request.content is not None: - headers['content-type'] = \ - request.content_type or 'application/octet-stream' - headers['content-length'] = str(len(request.content)) - elif request.content_stream is not None: - headers['content-type'] = \ - request.content_type or 'application/octet-stream' - # TODO Avoid reading the full content at once - if isinstance(request.content_stream, types.GeneratorType): - request.content = ''.join([i for i in request.content_stream]) - else: - request.content = request.content_stream.read() - headers['content-length'] = str(len(request.content)) + headers = { + 'content-type': request.content_type or 'application/octet-stream', + } for env_key, key in ( + ('CONTENT_LENGTH', 'content-length'), ('HTTP_IF_MODIFIED_SINCE', 'if-modified-since'), ('HTTP_ACCEPT_LANGUAGE', 'accept-language'), ('HTTP_ACCEPT_ENCODING', 'accept-encoding'), @@ -269,12 +270,18 @@ class Connection(object): headers[key] = value headers.update(request.headers) + data = None + if request.method in ('POST', 'PUT'): + if request.content_type == 'application/json': + data = json.dumps(request.content) + else: + data = request.content + path = request.path while True: - reply = self.request(request.method, path, - data=request.content, params=request.query or request, - headers=headers, allowed=_REDIRECT_CODES, - allow_redirects=False) + reply = self.request(request.method, path, data=data, + params=request.query or request, headers=headers, + allowed=_REDIRECT_CODES, allow_redirects=False) resend = reply.status_code in _REDIRECT_CODES if response is not None: if 'transfer-encoding' in reply.headers: @@ -293,7 +300,10 @@ class Connection(object): if request.method != 'HEAD': if reply.headers.get('Content-Type') == 'application/json': - return json.loads(reply.content) + if reply.content: + return json.loads(reply.content) + else: + return None else: return reply.raw diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/packets.py index edbbf02..46bc223 100644 --- a/sugar_network/toolkit/parcel.py +++ b/sugar_network/toolkit/packets.py @@ -34,13 +34,13 @@ from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce DEFAULT_COMPRESSLEVEL = 6 -_FILENAME_SUFFIX = '.parcel' +_FILENAME_SUFFIX = '.packet' _RESERVED_DISK_SPACE = 1024 * 1024 _ZLIB_WBITS = 15 _ZLIB_WBITS_SIZE = 32768 # 2 ** 15 -_logger = logging.getLogger('parcel') +_logger = logging.getLogger('packets') def decode(stream, limit=None): @@ -49,26 +49,19 @@ def decode(stream, limit=None): if limit is not None: limit -= 2 magic = stream.read(2) - enforce(len(magic) == 2, http.BadRequest, 'Malformed parcel') + enforce(len(magic) == 2, http.BadRequest, 'Malformed packet') if magic == '\037\213': stream = _ZippedDecoder(stream, limit) else: stream = _Decoder(magic, stream, limit) header = stream.read_record() - packet = _DecodeIterator(stream) - while True: - packet.next() - if packet.name == 'last': - break - packet.header.update(header) - yield packet + return _DecodeIterator(stream, header) -def encode(packets, limit=None, header=None, compresslevel=None, - on_complete=None): - _logger.debug('Encode %r packets limit=%r header=%r', - packets, limit, header) +def encode(items, limit=None, header=None, compresslevel=None, + on_complete=None, **kwargs): + _logger.debug('Encode %r limit=%r header=%r', items, limit, header) if compresslevel is 0: ostream = _Encoder() @@ -82,71 +75,84 @@ def encode(packets, limit=None, header=None, compresslevel=None, if limit is None: limit = sys.maxint if header is None: - header = {} + header = kwargs + else: + header.update(kwargs) chunk = ostream.write_record(header) if chunk: yield chunk - for packet, props, content in packets: - if props is None: - props = {} - props['packet'] = packet - chunk = ostream.write_record(props) - if chunk: - yield chunk - - if content is None: - continue + try: + items = iter(items) + record = next(items) + multisegments = type(record) in (tuple, list) - content = iter(content) - try: - finalizing = False - record = next(content) - while True: - if record is None: - finalizing = True - record = next(content) - continue - blob_len = 0 - if isinstance(record, File): - blob_len = record.size - chunk = record.meta - else: - chunk = record - chunk = ostream.write_record(chunk, - None if finalizing else limit - blob_len) - if chunk is None: - _logger.debug('Reach the encoding limit') - on_complete = None - if not isinstance(content, GeneratorType): - raise StopIteration() - finalizing = True - record = content.throw(StopIteration()) - continue + while True: + if multisegments: + packet, props, content = record + if props is None: + props = {} + props['segment'] = packet + chunk = ostream.write_record(props) if chunk: yield chunk - if blob_len: - for chunk in record.iter_content(): - blob_len -= len(chunk) - if not blob_len: - chunk += '\n' - chunk = ostream.write(chunk) - if chunk: - yield chunk - enforce(blob_len == 0, EOFError, 'Blob size mismatch') - record = next(content) - except StopIteration: - pass + if content: + content = iter(content) + record = next(content) + else: + content = iter([]) + record = None + else: + content = items + + try: + finalizing = False + while True: + if record is None: + finalizing = True + record = next(content) + continue + blob_len = 0 + if isinstance(record, File): + blob_len = record.size + chunk = record.meta + else: + chunk = record + chunk = ostream.write_record(chunk, + None if finalizing else limit - blob_len) + if chunk is None: + _logger.debug('Reach the encoding limit') + on_complete = None + if not isinstance(content, GeneratorType): + raise StopIteration() + finalizing = True + record = content.throw(StopIteration()) + continue + if chunk: + yield chunk + if blob_len: + for chunk in record.iter_content(): + blob_len -= len(chunk) + if not blob_len: + chunk += '\n' + chunk = ostream.write(chunk) + if chunk: + yield chunk + enforce(blob_len == 0, EOFError, 'Blob size mismatch') + record = next(content) + except StopIteration: + pass + if multisegments: + record = next(items) + continue + break + finally: if on_complete is not None: on_complete() - - chunk = ostream.write_record({'packet': 'last'}) - if chunk: - yield chunk - chunk = ostream.flush() - if chunk: - yield chunk + chunk = ostream.flush() + if chunk: + yield chunk def decode_dir(root, recipient=None, session=None): @@ -154,18 +160,19 @@ def decode_dir(root, recipient=None, session=None): for filename in files: if not filename.endswith(_FILENAME_SUFFIX): continue - with file(join(root, filename), 'rb') as parcel: - for packet in decode(parcel): - if recipient is not None and packet['from'] == recipient: - if session and packet['session'] == session: - _logger.debug('Skip the same session %r parcel', - parcel.name) - else: - _logger.debug('Remove outdated %r parcel', - parcel.name) - os.unlink(parcel.name) - break - yield packet + with file(join(root, filename), 'rb') as packets: + packet = decode(packets) + if recipient is not None and packet['from'] == recipient: + if session and packet['session'] == session: + _logger.debug('Skip the same session %r packet', + packets.name) + else: + _logger.debug('Remove outdated %r packet', + packets.name) + os.unlink(packets.name) + continue + for i in packet: + yield i def encode_dir(packets, root=None, limit=None, path=None, sender=None, @@ -182,36 +189,22 @@ def encode_dir(packets, root=None, limit=None, path=None, sender=None, if sender is not None: header['from'] = sender - _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header) + _logger.debug('Creating %r packet limit=%s header=%r', path, limit, header) - with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel: + with toolkit.NamedTemporaryFile(dir=dirname(path)) as f: for chunk in encode(packets, limit, header): - parcel.write(chunk) + f.write(chunk) coroutine.dispatch() - parcel.flush() - os.fsync(parcel.fileno()) - os.rename(parcel.name, path) + f.flush() + os.fsync(f.fileno()) + os.rename(f.name, path) class _DecodeIterator(object): - def __init__(self, stream): + def __init__(self, stream, header): self._stream = stream - self.header = {} - self._name = None - self._shift = True - - @property - def name(self): - return self._name - - def next(self): - if self._shift: - for __ in self: - pass - if self._name is None: - raise EOFError() - self._shift = True + self.header = header def __repr__(self): return '<Packet %r>' % self.header @@ -219,38 +212,70 @@ class _DecodeIterator(object): def __getitem__(self, key): return self.header.get(key) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + def __iter__(self): while True: record = self._stream.read_record() if record is None: - self._name = None - raise EOFError() - if 'packet' in record: - self._name = record['packet'] or '' - self.header = record - self._shift = False break - blob_len = record.get('content-length') - if blob_len is None: - yield record - continue - blob_len = int(blob_len) - with toolkit.NamedTemporaryFile() as blob: - digest = hashlib.sha1() - while blob_len: - chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) - enforce(chunk, 'Blob size mismatch') - blob.write(chunk) - blob_len -= len(chunk) - digest.update(chunk) - blob.flush() - yield File(blob.name, digest=digest.hexdigest(), meta=record) + if 'segment' in record: + while record is not None: + record.update(self.header) + segment = _SegmentIterator(self._stream, record) + yield segment + record = segment.next_segment + if record is not None: + continue + while True: + record = self._stream.read_record() + if record is None or 'segment' in record: + break + break + for i in self._process_record(record): + yield i - def __enter__(self): - return self + def _process_record(self, record): + blob_len = record.get('content-length') + if blob_len is None: + yield record + return - def __exit__(self, exc_type, exc_value, traceback): - pass + blob_len = int(blob_len) + with toolkit.NamedTemporaryFile() as blob: + digest = hashlib.sha1() + while blob_len: + chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) + enforce(chunk, 'Blob size mismatch') + blob.write(chunk) + blob_len -= len(chunk) + digest.update(chunk) + blob.flush() + yield File(blob.name, digest=digest.hexdigest(), meta=record) + + +class _SegmentIterator(_DecodeIterator): + + next_segment = None + + @property + def name(self): + return self.header['segment'] + + def __iter__(self): + while True: + record = self._stream.read_record() + if record is None: + break + if 'segment' in record: + self.next_segment = record + break + for i in self._process_record(record): + yield i class _Encoder(object): @@ -317,16 +342,21 @@ class _Decoder(object): self._buffer = prefix self._stream = stream self._limit = limit + self._eof = False def read_record(self): while True: parts = self._buffer.split('\n', 1) if len(parts) == 1: - if self._read(BUFFER_SIZE): + if self._read(BUFFER_SIZE) and not self._eof: continue - return None - result, self._buffer = parts + result = parts[0] + self._buffer = '' + else: + result, self._buffer = parts if not result: + if self._eof: + return None continue return json.loads(result) @@ -342,7 +372,9 @@ class _Decoder(object): if self._limit is not None: size = min(size, self._limit) chunk = self._stream.read(size) - if chunk and self._limit is not None: + if not chunk: + self._eof = True + elif self._limit is not None: self._limit -= len(chunk) return self._decode(chunk) @@ -365,7 +397,7 @@ class _ZippedDecoder(_Decoder): 'Unknown compression method') enforce(ord(stream.read(1)) == 0, http.BadRequest, 'Gzip flags should be empty') - stream.read(6) # Ignore the rest of header + stream.read(6) # Ignore the rest of ZIP header def _decode(self, chunk): if chunk: diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index f4b23ce..bd5da32 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -88,10 +88,9 @@ class ACL(object): AUTH = 1 << 10 AUTHOR = 1 << 11 - SUPERUSER = 1 << 12 + AGG_AUTHOR = 1 << 12 - LOCAL = 1 << 13 - CALC = 1 << 14 + LOCAL = 1 << 14 NAMES = { CREATE: 'Create', @@ -107,8 +106,7 @@ class ACL(object): class Request(dict): def __init__(self, environ=None, method=None, path=None, cmd=None, - content=None, content_stream=None, content_type=None, - principal=None, **kwargs): + content=None, content_type=None, principal=None, **kwargs): dict.__init__(self) self.path = [] @@ -120,7 +118,6 @@ class Request(dict): self._dirty_query = False self._if_modified_since = _NOT_SET self._accept_language = _NOT_SET - self._content_stream = content_stream or _NOT_SET self._content_type = content_type or _NOT_SET if environ: @@ -194,7 +191,17 @@ class Request(dict): @property def content(self): - self.ensure_content() + if self._content is not _NOT_SET: + return self._content + stream = self.environ.get('wsgi.input') + if stream is None: + self._content = None + else: + stream = _ContentStream(stream, self.content_length) + if self.content_type == 'application/json': + self._content = json.load(stream) + else: + self._content = stream return self._content @content.setter @@ -212,39 +219,41 @@ class Request(dict): self.environ['CONTENT_LENGTH'] = str(value) @property - def content_stream(self): - if self._content_stream is _NOT_SET: - s = self.environ.get('wsgi.input') - if s is None: - self._content_stream = None - else: - self._content_stream = _ContentStream(s, self.content_length) - return self._content_stream - - @content_stream.setter - def content_stream(self, value): - self._content_stream = value - - @property def resource(self): if self.path: return self.path[0] + @resource.setter + def resource(self, value): + self.path[0] = value + @property def guid(self): if len(self.path) > 1: return self.path[1] + @guid.setter + def guid(self, value): + self.path[1] = value + @property def prop(self): if len(self.path) > 2: return self.path[2] + @prop.setter + def prop(self, value): + self.path[2] = value + @property def key(self): if len(self.path) > 3: return self.path[3] + @key.setter + def key(self, value): + self.path[3] = value + @property def static_prefix(self): http_host = self.environ.get('HTTP_HOST') @@ -298,16 +307,6 @@ class Request(dict): else: existing_value = self[key] = [existing_value, value] - def ensure_content(self): - if self._content is not _NOT_SET: - return - if self.content_stream is None: - self._content = None - elif self.content_type == 'application/json': - self._content = json.load(self.content_stream) - else: - self._content = self.content_stream.read() - def __repr__(self): return '<Request method=%s path=%r cmd=%s query=%r>' % \ (self.method, self.path, self.cmd, dict(self)) @@ -539,7 +538,6 @@ class Router(object): if route_.mime_type == 'text/event-stream' and \ self._allow_spawn and 'spawn' in request: _logger.debug('Spawn event stream for %r', request) - request.ensure_content() coroutine.spawn(self._event_stream, request, result) result = None elif route_.mime_type and 'content-type' not in response: @@ -617,29 +615,23 @@ class Router(object): response.content_type = 'application/json' streamed_content = isinstance(content, types.GeneratorType) - - if request.method == 'HEAD': - streamed_content = False - content = None - elif js_callback: + if js_callback or response.content_type == 'application/json': if streamed_content: content = ''.join(content) streamed_content = False - content = '%s(%s);' % (js_callback, json.dumps(content)) - response.content_length = len(content) - elif not streamed_content: - if response.content_type == 'application/json': + else: content = json.dumps(content) - response.content_length = len(content) - elif 'content-length' not in response: - response.content_length = len(content) if content else 0 - if request.method == 'HEAD' and content is not None: - _logger.warning('Content from HEAD response is ignored') + if js_callback: + content = '%s(%s);' % (js_callback, content) + if request.method == 'HEAD': + streamed_content = False content = None - _save_cookie(response, 'sugar_network_node', this.cookie) + elif not streamed_content: + response.content_length = len(content) if content else 0 _logger.trace('%s call: request=%s response=%r content=%r', self, request.environ, response, repr(content)[:256]) + _save_cookie(response, 'sugar_network_node', this.cookie) start_response(response.status, response.items()) if streamed_content: @@ -872,6 +864,13 @@ class _Route(object): def __init__(self, callback, method, path, cmd, mime_type=None, acl=0, arguments=None): + enforce(acl ^ ACL.AUTHOR or acl & ACL.AUTH, + 'ACL.AUTHOR without ACL.AUTH') + enforce(acl ^ ACL.AUTHOR or len(path) >= 2, + 'ACL.AUTHOR requires longer path') + enforce(acl ^ ACL.AGG_AUTHOR or len(path) >= 3, + 'ACL.AGG_AUTHOR requires longer path') + self.op = (method, cmd) self.callback = callback self.method = method diff --git a/sugar_network/toolkit/spec.py b/sugar_network/toolkit/spec.py index b3f83e9..279e748 100644 --- a/sugar_network/toolkit/spec.py +++ b/sugar_network/toolkit/spec.py @@ -174,7 +174,7 @@ def parse_requires(requires): return result -def ensure(version, cond): +def ensure_version(version, cond): if cond: for op, cond_version in cond: if op == [0]: |