Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/db/routes.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/db/routes.py')
-rw-r--r--sugar_network/db/routes.py452
1 files changed, 185 insertions, 267 deletions
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index 19ad26c..2f8fc69 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2011-2013 Aleksey Lim
+# Copyright (C) 2011-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -13,21 +13,17 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
-import sys
import time
import json
-import types
-import hashlib
import logging
from contextlib import contextmanager
-from os.path import exists
from sugar_network import toolkit
-from sugar_network.db.metadata import AggregatedType
-from sugar_network.db.metadata import BlobProperty, StoredProperty, LIST_TYPES
-from sugar_network.toolkit.router import Blob, ACL, route
+from sugar_network.db import files
+from sugar_network.db.metadata import Aggregated
+from sugar_network.toolkit.router import ACL, route, preroute, fallbackroute
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, enforce
@@ -38,62 +34,86 @@ _logger = logging.getLogger('db.routes')
class Routes(object):
- def __init__(self, volume):
+ def __init__(self, volume, find_limit=None):
self.volume = volume
+ self._find_limit = find_limit
+ this.volume = self.volume
- @route('POST', [None],
- acl=ACL.AUTH, mime_type='application/json')
+ @preroute
+ def __preroute__(self, op, request, response):
+ this.request = request
+ this.response = response
+
+ @route('POST', [None], acl=ACL.AUTH, mime_type='application/json')
def create(self, request):
- with self._post(request, ACL.CREATE) as (directory, doc):
- event = {}
- self.on_create(request, doc.props, event)
- if 'guid' not in doc.props:
- doc.props['guid'] = toolkit.uuid()
- doc.guid = doc.props['guid']
- directory.create(doc.props, event)
- return doc.guid
+ with self._post(request, ACL.CREATE) as doc:
+ self.on_create(request, doc.props)
+ self.volume[request.resource].create(doc.props)
+ self.after_post(doc)
+ return doc['guid']
@route('GET', [None],
- arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
+ arguments={
+ 'offset': int,
+ 'limit': int,
+ 'layer': [],
+ 'reply': ('guid',),
+ },
mime_type='application/json')
- def find(self, request, reply):
+ def find(self, request, reply, limit, layer):
self._preget(request)
- documents, total = self.volume[request.resource].find(**request)
- result = [self._get_props(i, request, reply) for i in documents]
+ if self._find_limit:
+ if limit <= 0:
+ request['limit'] = self._find_limit
+ elif limit > self._find_limit:
+ _logger.warning('The find limit is restricted to %s',
+ self._find_limit)
+ request['limit'] = self._find_limit
+ if 'deleted' in layer:
+ _logger.warning('Requesting "deleted" layer, will ignore')
+ layer.remove('deleted')
+ documents, total = self.volume[request.resource].find(
+ not_layer='deleted', **request)
+ result = [self._postget(request, i, reply) for i in documents]
return {'total': total, 'result': result}
- @route('GET', [None, None], cmd='exists',
- mime_type='application/json')
+ @route('GET', [None, None], cmd='exists', mime_type='application/json')
def exists(self, request):
directory = self.volume[request.resource]
return directory.exists(request.guid)
- @route('PUT', [None, None],
- acl=ACL.AUTH | ACL.AUTHOR)
+ @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update(self, request):
- with self._post(request, ACL.WRITE) as (directory, doc):
+ with self._post(request, ACL.WRITE) as doc:
if not doc.props:
return
- event = {}
- self.on_update(request, doc.props, event)
- directory.update(doc.guid, doc.props, event)
-
- @route('PUT', [None, None, None],
- acl=ACL.AUTH | ACL.AUTHOR)
- def update_prop(self, request, url=None):
- if url:
- value = Blob({'url': url})
- elif request.content is None:
+ self.on_update(request, doc.props)
+ self.volume[request.resource].update(doc.guid, doc.props)
+ self.after_post(doc)
+
+ @route('GET', [None, None], cmd='diff', mime_type='application/json')
+ def diff(self, request):
+ result = {}
+ res = self.volume[request.resource][request.guid]
+ for prop, meta, __ in res.diff(toolkit.Sequence([[0, None]])):
+ result[prop] = meta
+ return result
+
+ @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR)
+ def update_prop(self, request):
+ if request.content is None:
value = request.content_stream
else:
value = request.content
request.content = {request.prop: value}
self.update(request)
- @route('DELETE', [None, None],
- acl=ACL.AUTH | ACL.AUTHOR)
+ @route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
def delete(self, request):
- self.volume[request.resource].delete(request.guid)
+ # Node data should not be deleted immediately
+ # to make master-slave synchronization possible
+ request.content = {'layer': 'deleted'}
+ self.update(request)
@route('GET', [None, None], arguments={'reply': list},
mime_type='application/json')
@@ -101,65 +121,56 @@ class Routes(object):
if not reply:
reply = []
for prop in self.volume[request.resource].metadata.values():
- if prop.acl & ACL.READ and not (prop.acl & ACL.LOCAL):
+ if prop.acl & ACL.READ and not (prop.acl & ACL.LOCAL) and \
+ not isinstance(prop, Aggregated):
reply.append(prop.name)
self._preget(request)
doc = self.volume[request.resource].get(request.guid)
- return self._get_props(doc, request, reply)
+ enforce('deleted' not in doc['layer'], http.NotFound, 'Deleted')
+ return self._postget(request, doc, reply)
@route('GET', [None, None, None], mime_type='application/json')
def get_prop(self, request, response):
- return self._prop_meta(request, response)
+ directory = self.volume[request.resource]
+ doc = directory.get(request.guid)
+
+ prop = directory.metadata[request.prop]
+ prop.assert_access(ACL.READ)
+
+ meta = doc.meta(prop.name) or {}
+ if 'value' in meta:
+ value = _get_prop(doc, prop, meta.pop('value'))
+ enforce(value is not toolkit.File.AWAY, http.NotFound, 'No blob')
+ else:
+ value = prop.default
+
+ response.meta = meta
+ response.last_modified = meta.get('mtime')
+ if isinstance(value, toolkit.File):
+ response.content_length = value.get('size') or 0
+ else:
+ response.content_length = len(json.dumps(value))
+
+ return value
@route('HEAD', [None, None, None])
def get_prop_meta(self, request, response):
- self._prop_meta(request, response)
+ self.get_prop(request, response)
@route('POST', [None, None, None],
acl=ACL.AUTH, mime_type='application/json')
def insert_to_aggprop(self, request):
- content = request.content or {}
- enforce(isinstance(content, dict), http.BadRequest, 'Invalid value')
-
- directory = self.volume[request.resource]
- prop = directory.metadata[request.prop]
+ return self._aggpost(request, ACL.INSERT)
- enforce(prop.typecast is AggregatedType, http.BadRequest,
- 'Property is not aggregated')
- prop.assert_access(ACL.INSERT)
- self.on_aggprop_update(request, prop, None)
-
- if request.principal:
- authors = content['author'] = {}
- self._useradd(authors, request.principal, ACL.ORIGINAL)
- guid = content.pop('guid') if 'guid' in content else toolkit.uuid()
- props = {request.prop: {guid: content}}
- event = {}
- self.on_update(request, props, event)
- directory.update(request.guid, props, event)
-
- return guid
+ @route('PUT', [None, None, None, None],
+ acl=ACL.AUTH, mime_type='application/json')
+ def update_aggprop(self, request):
+ self._aggpost(request, ACL.REPLACE, request.key)
@route('DELETE', [None, None, None, None],
acl=ACL.AUTH, mime_type='application/json')
def remove_from_aggprop(self, request):
- directory = self.volume[request.resource]
- doc = directory.get(request.guid)
- prop = directory.metadata[request.prop]
-
- enforce(prop.typecast is AggregatedType, http.BadRequest,
- 'Property is not aggregated')
- prop.assert_access(ACL.REMOVE)
-
- guid = request.path[3]
- enforce(guid in doc[request.prop], http.NotFound,
- 'No such aggregated item')
- self.on_aggprop_update(request, prop, doc[request.prop][guid])
-
- props = {request.prop: {guid: {}}}
- event = {}
- self.on_update(request, props, event)
- directory.update(request.guid, props, event)
+ self._aggpost(request, ACL.REMOVE, request.key)
@route('PUT', [None, None], cmd='useradd',
arguments={'role': 0}, acl=ACL.AUTH | ACL.AUTHOR)
@@ -180,7 +191,11 @@ class Routes(object):
del authors[user]
directory.update(request.guid, {'author': authors})
- def on_create(self, request, props, event):
+ @fallbackroute('GET', ['blobs'])
+ def blobs(self, request):
+ return files.get(request.guid)
+
+ def on_create(self, request, props):
ts = int(time.time())
props['ctime'] = ts
props['mtime'] = ts
@@ -189,7 +204,7 @@ class Routes(object):
authors = props['author'] = {}
self._useradd(authors, request.principal, ACL.ORIGINAL)
- def on_update(self, request, props, event):
+ def on_update(self, request, props):
props['mtime'] = int(time.time())
def on_aggprop_update(self, request, prop, value):
@@ -200,103 +215,57 @@ class Routes(object):
@contextmanager
def _post(self, request, access):
- content = request.content or {}
- enforce(isinstance(content, dict), 'Invalid value')
-
+ content = request.content
+ enforce(isinstance(content, dict), http.BadRequest, 'Invalid value')
directory = self.volume[request.resource]
- if request.guid:
- doc = directory.get(request.guid)
- else:
- doc = directory.resource_class(None, {})
- doc.request = request
- blobs = []
-
- for name, value in content.items():
- prop = directory.metadata[name]
- if isinstance(prop, BlobProperty):
- prop.assert_access(ACL.CREATE if
- access == ACL.WRITE and doc.meta(name) is None
- else access)
- if value is None:
- value = {'blob': None}
- elif isinstance(value, basestring) or hasattr(value, 'read'):
- value = _read_blob(request, prop, value)
- blobs.append(value['blob'])
- elif isinstance(value, dict):
- enforce('url' in value or 'blob' in value, 'No bundle')
- else:
- raise RuntimeError('Incorrect BLOB value')
- else:
- prop.assert_access(access)
- if prop.localized and isinstance(value, basestring):
- value = {request.accept_language[0]: value}
- try:
- value = _typecast_prop_value(prop.typecast, value)
- except Exception, error:
- error = 'Value %r for %r property is invalid: %s' % \
- (value, prop.name, error)
- toolkit.exception(error)
- raise RuntimeError(error)
- doc[name] = value
if access == ACL.CREATE:
- for name, prop in directory.metadata.items():
- if not isinstance(prop, BlobProperty) and \
- content.get(name) is None and \
- (prop.default is not None or prop.on_set is not None):
- doc[name] = prop.default
- if doc['guid']:
+ doc = directory.resource_class(None, None)
+ if 'guid' in content:
# TODO Temporal security hole, see TODO
- enforce(not self.volume[request.resource].exists(doc['guid']),
- '%s already exists', doc['guid'])
- enforce(_GUID_RE.match(doc['guid']) is not None,
- 'Malformed %s GUID', doc['guid'])
+ guid = content['guid']
+ enforce(not directory.exists(guid),
+ http.BadRequest, '%s already exists', guid)
+ enforce(_GUID_RE.match(guid) is not None,
+ http.BadRequest, 'Malformed %s GUID', guid)
else:
- doc['guid'] = toolkit.uuid()
+ doc.props['guid'] = toolkit.uuid()
+ for name, prop in directory.metadata.items():
+ if name not in content and prop.default is not None:
+ doc.props[name] = prop.default
+ orig = None
+ this.resource = doc
+ else:
+ doc = directory.get(request.guid)
+ orig = directory.get(request.guid)
+ this.resource = orig
- try:
- for name, value in doc.props.items():
+ def teardown(new):
+ if orig is None:
+ return
+ for name, orig_value in orig.props.items():
+ if doc[name] == orig_value:
+ continue
prop = directory.metadata[name]
- if prop.on_set is not None:
- doc.props[name] = prop.on_set(doc, value)
- yield directory, doc
- finally:
- for path in blobs:
- if exists(path):
- os.unlink(path)
+ prop.teardown(doc[name] if new else orig_value)
- self.after_post(doc)
-
- def _prop_meta(self, request, response):
- directory = self.volume[request.resource]
- prop = directory.metadata[request.prop]
- doc = directory.get(request.guid)
- doc.request = request
-
- prop.assert_access(ACL.READ)
-
- if isinstance(prop, StoredProperty):
- meta = doc.meta(prop.name) or {}
- if 'value' in meta:
- del meta['value']
- value = doc.get(prop.name, request.accept_language)
- value = prop.on_get(doc, value)
- response.content_length = len(json.dumps(value))
+ try:
+ for name, value in content.items():
+ prop = directory.metadata[name]
+ prop.assert_access(access, orig[name] if orig else None)
+ try:
+ doc.props[name] = prop.typecast(value)
+ except Exception, error:
+ error = 'Value %r for %r property is invalid: %s' % \
+ (value, prop.name, error)
+ toolkit.exception(error)
+ raise http.BadRequest(error)
+ yield doc
+ except Exception:
+ teardown(True)
+ raise
else:
- value = prop.on_get(doc, doc.meta(prop.name))
- enforce(value is not None and ('blob' in value or 'url' in value),
- http.NotFound, 'BLOB does not exist')
- if 'blob' in value:
- meta = value.copy()
- meta.pop('blob')
- else:
- meta = value
-
- response.meta = meta
- response.last_modified = meta.get('mtime')
- response.content_length = meta.get('blob_size') or 0
-
- return value
+ teardown(False)
def _preget(self, request):
reply = request.get('reply')
@@ -307,27 +276,11 @@ class Routes(object):
for prop in reply:
directory.metadata[prop].assert_access(ACL.READ)
- def _get_props(self, doc, request, props):
+ def _postget(self, request, doc, props):
result = {}
- metadata = doc.metadata
- doc.request = request
for name in props:
- prop = metadata[name]
- value = prop.on_get(doc, doc.get(name, request.accept_language))
- if value is None:
- value = prop.default
- elif isinstance(value, Blob):
- for key in ('mtime', 'seqno', 'blob'):
- if key in value:
- del value[key]
- url = value.get('url')
- if url is None:
- value['url'] = '/'.join([
- request.static_prefix, metadata.name, doc.guid, name,
- ])
- elif url.startswith('/'):
- value['url'] = request.static_prefix + url
- result[name] = value
+ prop = doc.metadata[name]
+ result[name] = _get_prop(doc, prop, doc.get(name))
return result
def _useradd(self, authors, user, role):
@@ -351,83 +304,48 @@ class Routes(object):
props['order'] = 0
authors[user] = props
+ def _aggpost(self, request, acl, aggid=None):
+ doc = this.resource = self.volume[request.resource][request.guid]
+ prop = doc.metadata[request.prop]
+ enforce(isinstance(prop, Aggregated), http.BadRequest,
+ 'Property is not aggregated')
+ prop.assert_access(acl)
-def _read_blob(request, prop, value):
- digest = hashlib.sha1()
- dst = toolkit.NamedTemporaryFile(delete=False)
-
- try:
- if isinstance(value, basestring):
- digest.update(value)
- dst.write(value)
- else:
- size = request.content_length or sys.maxint
- while size > 0:
- chunk = value.read(min(size, toolkit.BUFFER_SIZE))
- if not chunk:
- break
- dst.write(chunk)
- size -= len(chunk)
- digest.update(chunk)
- except Exception:
- os.unlink(dst.name)
- raise
- finally:
- dst.close()
-
- if request.prop and request.content_type:
- mime_type = request.content_type
- else:
- mime_type = prop.mime_type
-
- return {'blob': dst.name,
- 'digest': digest.hexdigest(),
- 'mime_type': mime_type,
- }
-
-
-def _typecast_prop_value(typecast, value):
- if typecast is None:
- return value
- enforce(value is not None, ValueError, 'Property value cannot be None')
-
- def cast(typecast, value):
- if isinstance(typecast, types.FunctionType):
- return typecast(value)
- elif typecast is unicode:
- return value.encode('utf-8')
- elif typecast is str:
- return str(value)
- elif typecast is int:
- return int(value)
- elif typecast is float:
- return float(value)
- elif typecast is bool:
- return bool(value)
- elif typecast is dict:
- return dict(value)
+ if aggid and aggid in doc[request.prop]:
+ aggvalue = doc[request.prop][aggid]
+ self.on_aggprop_update(request, prop, aggvalue)
+ prop.subteardown(aggvalue['value'])
else:
- raise ValueError('Unknown typecast')
+ enforce(acl != ACL.REMOVE, http.NotFound, 'No aggregated item')
+ self.on_aggprop_update(request, prop, None)
+
+ aggvalue = {}
+ if acl != ACL.REMOVE:
+ value = prop.subtypecast(
+ request.content_stream if request.content is None
+ else request.content)
+ if type(value) is tuple:
+ aggid_, value = value
+ enforce(not aggid or aggid == aggid_, http.BadRequest,
+ 'Wrong aggregated id')
+ aggid = aggid_
+ elif not aggid:
+ aggid = toolkit.uuid()
+ aggvalue['value'] = value
+
+ if request.principal:
+ authors = aggvalue['author'] = {}
+ role = ACL.ORIGINAL if request.principal in doc['author'] else 0
+ self._useradd(authors, request.principal, role)
+ props = {request.prop: {aggid: aggvalue}}
+ self.on_update(request, props)
+ self.volume[request.resource].update(request.guid, props)
+
+ return aggid
- if type(typecast) in LIST_TYPES:
- if typecast:
- first = iter(typecast).next()
- else:
- first = None
- if first is not None and type(first) is not type and \
- type(first) not in LIST_TYPES:
- value = cast(type(first), value)
- enforce(value in typecast, ValueError,
- "Value %r is not in '%s' list",
- value, ', '.join([str(i) for i in typecast]))
- else:
- enforce(len(typecast) <= 1, ValueError,
- 'List values should contain values of the same type')
- if type(value) not in LIST_TYPES:
- value = (value,)
- typecast, = typecast or [str]
- value = tuple([_typecast_prop_value(typecast, i) for i in value])
- else:
- value = cast(typecast, value)
+def _get_prop(doc, prop, value):
+ value = prop.reprcast(value)
+ if prop.on_get is not None:
+ value = prop.on_get(doc, value)
return value