diff options
Diffstat (limited to 'sugar_network/db/routes.py')
-rw-r--r-- | sugar_network/db/routes.py | 452 |
1 files changed, 185 insertions, 267 deletions
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index 19ad26c..2f8fc69 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -1,4 +1,4 @@ -# Copyright (C) 2011-2013 Aleksey Lim +# Copyright (C) 2011-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -13,21 +13,17 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import re -import sys import time import json -import types -import hashlib import logging from contextlib import contextmanager -from os.path import exists from sugar_network import toolkit -from sugar_network.db.metadata import AggregatedType -from sugar_network.db.metadata import BlobProperty, StoredProperty, LIST_TYPES -from sugar_network.toolkit.router import Blob, ACL, route +from sugar_network.db import files +from sugar_network.db.metadata import Aggregated +from sugar_network.toolkit.router import ACL, route, preroute, fallbackroute +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, enforce @@ -38,62 +34,86 @@ _logger = logging.getLogger('db.routes') class Routes(object): - def __init__(self, volume): + def __init__(self, volume, find_limit=None): self.volume = volume + self._find_limit = find_limit + this.volume = self.volume - @route('POST', [None], - acl=ACL.AUTH, mime_type='application/json') + @preroute + def __preroute__(self, op, request, response): + this.request = request + this.response = response + + @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') def create(self, request): - with self._post(request, ACL.CREATE) as (directory, doc): - event = {} - self.on_create(request, doc.props, event) - if 'guid' not in doc.props: - doc.props['guid'] = toolkit.uuid() - doc.guid = doc.props['guid'] - directory.create(doc.props, event) - return doc.guid + with self._post(request, ACL.CREATE) as doc: + self.on_create(request, doc.props) + self.volume[request.resource].create(doc.props) + self.after_post(doc) + return doc['guid'] @route('GET', [None], - arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, + arguments={ + 'offset': int, + 'limit': int, + 'layer': [], + 'reply': ('guid',), + }, mime_type='application/json') - def find(self, request, reply): + def find(self, request, reply, limit, layer): self._preget(request) - documents, total = self.volume[request.resource].find(**request) - result = [self._get_props(i, request, reply) for i in documents] + if self._find_limit: + if limit <= 0: + request['limit'] = self._find_limit + elif limit > self._find_limit: + _logger.warning('The find limit is restricted to %s', + self._find_limit) + request['limit'] = self._find_limit + if 'deleted' in layer: + _logger.warning('Requesting "deleted" layer, will ignore') + layer.remove('deleted') + documents, total = self.volume[request.resource].find( + not_layer='deleted', **request) + result = [self._postget(request, i, reply) for i in documents] return {'total': total, 'result': result} - @route('GET', [None, None], cmd='exists', - mime_type='application/json') + @route('GET', [None, None], cmd='exists', mime_type='application/json') def exists(self, request): directory = self.volume[request.resource] return directory.exists(request.guid) - @route('PUT', [None, None], - acl=ACL.AUTH | ACL.AUTHOR) + @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def update(self, request): - with self._post(request, ACL.WRITE) as (directory, doc): + with self._post(request, ACL.WRITE) as doc: if not doc.props: return - event = {} - self.on_update(request, doc.props, event) - directory.update(doc.guid, doc.props, event) - - @route('PUT', [None, None, None], - acl=ACL.AUTH | ACL.AUTHOR) - def update_prop(self, request, url=None): - if url: - value = Blob({'url': url}) - elif request.content is None: + self.on_update(request, doc.props) + self.volume[request.resource].update(doc.guid, doc.props) + self.after_post(doc) + + @route('GET', [None, None], cmd='diff', mime_type='application/json') + def diff(self, request): + result = {} + res = self.volume[request.resource][request.guid] + for prop, meta, __ in res.diff(toolkit.Sequence([[0, None]])): + result[prop] = meta + return result + + @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR) + def update_prop(self, request): + if request.content is None: value = request.content_stream else: value = request.content request.content = {request.prop: value} self.update(request) - @route('DELETE', [None, None], - acl=ACL.AUTH | ACL.AUTHOR) + @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def delete(self, request): - self.volume[request.resource].delete(request.guid) + # Node data should not be deleted immediately + # to make master-slave synchronization possible + request.content = {'layer': 'deleted'} + self.update(request) @route('GET', [None, None], arguments={'reply': list}, mime_type='application/json') @@ -101,65 +121,56 @@ class Routes(object): if not reply: reply = [] for prop in self.volume[request.resource].metadata.values(): - if prop.acl & ACL.READ and not (prop.acl & ACL.LOCAL): + if prop.acl & ACL.READ and not (prop.acl & ACL.LOCAL) and \ + not isinstance(prop, Aggregated): reply.append(prop.name) self._preget(request) doc = self.volume[request.resource].get(request.guid) - return self._get_props(doc, request, reply) + enforce('deleted' not in doc['layer'], http.NotFound, 'Deleted') + return self._postget(request, doc, reply) @route('GET', [None, None, None], mime_type='application/json') def get_prop(self, request, response): - return self._prop_meta(request, response) + directory = self.volume[request.resource] + doc = directory.get(request.guid) + + prop = directory.metadata[request.prop] + prop.assert_access(ACL.READ) + + meta = doc.meta(prop.name) or {} + if 'value' in meta: + value = _get_prop(doc, prop, meta.pop('value')) + enforce(value is not toolkit.File.AWAY, http.NotFound, 'No blob') + else: + value = prop.default + + response.meta = meta + response.last_modified = meta.get('mtime') + if isinstance(value, toolkit.File): + response.content_length = value.get('size') or 0 + else: + response.content_length = len(json.dumps(value)) + + return value @route('HEAD', [None, None, None]) def get_prop_meta(self, request, response): - self._prop_meta(request, response) + self.get_prop(request, response) @route('POST', [None, None, None], acl=ACL.AUTH, mime_type='application/json') def insert_to_aggprop(self, request): - content = request.content or {} - enforce(isinstance(content, dict), http.BadRequest, 'Invalid value') - - directory = self.volume[request.resource] - prop = directory.metadata[request.prop] + return self._aggpost(request, ACL.INSERT) - enforce(prop.typecast is AggregatedType, http.BadRequest, - 'Property is not aggregated') - prop.assert_access(ACL.INSERT) - self.on_aggprop_update(request, prop, None) - - if request.principal: - authors = content['author'] = {} - self._useradd(authors, request.principal, ACL.ORIGINAL) - guid = content.pop('guid') if 'guid' in content else toolkit.uuid() - props = {request.prop: {guid: content}} - event = {} - self.on_update(request, props, event) - directory.update(request.guid, props, event) - - return guid + @route('PUT', [None, None, None, None], + acl=ACL.AUTH, mime_type='application/json') + def update_aggprop(self, request): + self._aggpost(request, ACL.REPLACE, request.key) @route('DELETE', [None, None, None, None], acl=ACL.AUTH, mime_type='application/json') def remove_from_aggprop(self, request): - directory = self.volume[request.resource] - doc = directory.get(request.guid) - prop = directory.metadata[request.prop] - - enforce(prop.typecast is AggregatedType, http.BadRequest, - 'Property is not aggregated') - prop.assert_access(ACL.REMOVE) - - guid = request.path[3] - enforce(guid in doc[request.prop], http.NotFound, - 'No such aggregated item') - self.on_aggprop_update(request, prop, doc[request.prop][guid]) - - props = {request.prop: {guid: {}}} - event = {} - self.on_update(request, props, event) - directory.update(request.guid, props, event) + self._aggpost(request, ACL.REMOVE, request.key) @route('PUT', [None, None], cmd='useradd', arguments={'role': 0}, acl=ACL.AUTH | ACL.AUTHOR) @@ -180,7 +191,11 @@ class Routes(object): del authors[user] directory.update(request.guid, {'author': authors}) - def on_create(self, request, props, event): + @fallbackroute('GET', ['blobs']) + def blobs(self, request): + return files.get(request.guid) + + def on_create(self, request, props): ts = int(time.time()) props['ctime'] = ts props['mtime'] = ts @@ -189,7 +204,7 @@ class Routes(object): authors = props['author'] = {} self._useradd(authors, request.principal, ACL.ORIGINAL) - def on_update(self, request, props, event): + def on_update(self, request, props): props['mtime'] = int(time.time()) def on_aggprop_update(self, request, prop, value): @@ -200,103 +215,57 @@ class Routes(object): @contextmanager def _post(self, request, access): - content = request.content or {} - enforce(isinstance(content, dict), 'Invalid value') - + content = request.content + enforce(isinstance(content, dict), http.BadRequest, 'Invalid value') directory = self.volume[request.resource] - if request.guid: - doc = directory.get(request.guid) - else: - doc = directory.resource_class(None, {}) - doc.request = request - blobs = [] - - for name, value in content.items(): - prop = directory.metadata[name] - if isinstance(prop, BlobProperty): - prop.assert_access(ACL.CREATE if - access == ACL.WRITE and doc.meta(name) is None - else access) - if value is None: - value = {'blob': None} - elif isinstance(value, basestring) or hasattr(value, 'read'): - value = _read_blob(request, prop, value) - blobs.append(value['blob']) - elif isinstance(value, dict): - enforce('url' in value or 'blob' in value, 'No bundle') - else: - raise RuntimeError('Incorrect BLOB value') - else: - prop.assert_access(access) - if prop.localized and isinstance(value, basestring): - value = {request.accept_language[0]: value} - try: - value = _typecast_prop_value(prop.typecast, value) - except Exception, error: - error = 'Value %r for %r property is invalid: %s' % \ - (value, prop.name, error) - toolkit.exception(error) - raise RuntimeError(error) - doc[name] = value if access == ACL.CREATE: - for name, prop in directory.metadata.items(): - if not isinstance(prop, BlobProperty) and \ - content.get(name) is None and \ - (prop.default is not None or prop.on_set is not None): - doc[name] = prop.default - if doc['guid']: + doc = directory.resource_class(None, None) + if 'guid' in content: # TODO Temporal security hole, see TODO - enforce(not self.volume[request.resource].exists(doc['guid']), - '%s already exists', doc['guid']) - enforce(_GUID_RE.match(doc['guid']) is not None, - 'Malformed %s GUID', doc['guid']) + guid = content['guid'] + enforce(not directory.exists(guid), + http.BadRequest, '%s already exists', guid) + enforce(_GUID_RE.match(guid) is not None, + http.BadRequest, 'Malformed %s GUID', guid) else: - doc['guid'] = toolkit.uuid() + doc.props['guid'] = toolkit.uuid() + for name, prop in directory.metadata.items(): + if name not in content and prop.default is not None: + doc.props[name] = prop.default + orig = None + this.resource = doc + else: + doc = directory.get(request.guid) + orig = directory.get(request.guid) + this.resource = orig - try: - for name, value in doc.props.items(): + def teardown(new): + if orig is None: + return + for name, orig_value in orig.props.items(): + if doc[name] == orig_value: + continue prop = directory.metadata[name] - if prop.on_set is not None: - doc.props[name] = prop.on_set(doc, value) - yield directory, doc - finally: - for path in blobs: - if exists(path): - os.unlink(path) + prop.teardown(doc[name] if new else orig_value) - self.after_post(doc) - - def _prop_meta(self, request, response): - directory = self.volume[request.resource] - prop = directory.metadata[request.prop] - doc = directory.get(request.guid) - doc.request = request - - prop.assert_access(ACL.READ) - - if isinstance(prop, StoredProperty): - meta = doc.meta(prop.name) or {} - if 'value' in meta: - del meta['value'] - value = doc.get(prop.name, request.accept_language) - value = prop.on_get(doc, value) - response.content_length = len(json.dumps(value)) + try: + for name, value in content.items(): + prop = directory.metadata[name] + prop.assert_access(access, orig[name] if orig else None) + try: + doc.props[name] = prop.typecast(value) + except Exception, error: + error = 'Value %r for %r property is invalid: %s' % \ + (value, prop.name, error) + toolkit.exception(error) + raise http.BadRequest(error) + yield doc + except Exception: + teardown(True) + raise else: - value = prop.on_get(doc, doc.meta(prop.name)) - enforce(value is not None and ('blob' in value or 'url' in value), - http.NotFound, 'BLOB does not exist') - if 'blob' in value: - meta = value.copy() - meta.pop('blob') - else: - meta = value - - response.meta = meta - response.last_modified = meta.get('mtime') - response.content_length = meta.get('blob_size') or 0 - - return value + teardown(False) def _preget(self, request): reply = request.get('reply') @@ -307,27 +276,11 @@ class Routes(object): for prop in reply: directory.metadata[prop].assert_access(ACL.READ) - def _get_props(self, doc, request, props): + def _postget(self, request, doc, props): result = {} - metadata = doc.metadata - doc.request = request for name in props: - prop = metadata[name] - value = prop.on_get(doc, doc.get(name, request.accept_language)) - if value is None: - value = prop.default - elif isinstance(value, Blob): - for key in ('mtime', 'seqno', 'blob'): - if key in value: - del value[key] - url = value.get('url') - if url is None: - value['url'] = '/'.join([ - request.static_prefix, metadata.name, doc.guid, name, - ]) - elif url.startswith('/'): - value['url'] = request.static_prefix + url - result[name] = value + prop = doc.metadata[name] + result[name] = _get_prop(doc, prop, doc.get(name)) return result def _useradd(self, authors, user, role): @@ -351,83 +304,48 @@ class Routes(object): props['order'] = 0 authors[user] = props + def _aggpost(self, request, acl, aggid=None): + doc = this.resource = self.volume[request.resource][request.guid] + prop = doc.metadata[request.prop] + enforce(isinstance(prop, Aggregated), http.BadRequest, + 'Property is not aggregated') + prop.assert_access(acl) -def _read_blob(request, prop, value): - digest = hashlib.sha1() - dst = toolkit.NamedTemporaryFile(delete=False) - - try: - if isinstance(value, basestring): - digest.update(value) - dst.write(value) - else: - size = request.content_length or sys.maxint - while size > 0: - chunk = value.read(min(size, toolkit.BUFFER_SIZE)) - if not chunk: - break - dst.write(chunk) - size -= len(chunk) - digest.update(chunk) - except Exception: - os.unlink(dst.name) - raise - finally: - dst.close() - - if request.prop and request.content_type: - mime_type = request.content_type - else: - mime_type = prop.mime_type - - return {'blob': dst.name, - 'digest': digest.hexdigest(), - 'mime_type': mime_type, - } - - -def _typecast_prop_value(typecast, value): - if typecast is None: - return value - enforce(value is not None, ValueError, 'Property value cannot be None') - - def cast(typecast, value): - if isinstance(typecast, types.FunctionType): - return typecast(value) - elif typecast is unicode: - return value.encode('utf-8') - elif typecast is str: - return str(value) - elif typecast is int: - return int(value) - elif typecast is float: - return float(value) - elif typecast is bool: - return bool(value) - elif typecast is dict: - return dict(value) + if aggid and aggid in doc[request.prop]: + aggvalue = doc[request.prop][aggid] + self.on_aggprop_update(request, prop, aggvalue) + prop.subteardown(aggvalue['value']) else: - raise ValueError('Unknown typecast') + enforce(acl != ACL.REMOVE, http.NotFound, 'No aggregated item') + self.on_aggprop_update(request, prop, None) + + aggvalue = {} + if acl != ACL.REMOVE: + value = prop.subtypecast( + request.content_stream if request.content is None + else request.content) + if type(value) is tuple: + aggid_, value = value + enforce(not aggid or aggid == aggid_, http.BadRequest, + 'Wrong aggregated id') + aggid = aggid_ + elif not aggid: + aggid = toolkit.uuid() + aggvalue['value'] = value + + if request.principal: + authors = aggvalue['author'] = {} + role = ACL.ORIGINAL if request.principal in doc['author'] else 0 + self._useradd(authors, request.principal, role) + props = {request.prop: {aggid: aggvalue}} + self.on_update(request, props) + self.volume[request.resource].update(request.guid, props) + + return aggid - if type(typecast) in LIST_TYPES: - if typecast: - first = iter(typecast).next() - else: - first = None - if first is not None and type(first) is not type and \ - type(first) not in LIST_TYPES: - value = cast(type(first), value) - enforce(value in typecast, ValueError, - "Value %r is not in '%s' list", - value, ', '.join([str(i) for i in typecast])) - else: - enforce(len(typecast) <= 1, ValueError, - 'List values should contain values of the same type') - if type(value) not in LIST_TYPES: - value = (value,) - typecast, = typecast or [str] - value = tuple([_typecast_prop_value(typecast, i) for i in value]) - else: - value = cast(typecast, value) +def _get_prop(doc, prop, value): + value = prop.reprcast(value) + if prop.on_get is not None: + value = prop.on_get(doc, value) return value |