Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-04-27 14:52:25 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-04-27 14:52:25 (GMT)
commit2dbc9b554f322ea23b224d923d9a6475e33ad6e9 (patch)
tree55df42ddf7a0ec8d4ca6ef007218b1056409dc0b /sugar_network
parent046073b04229021ec53833a353ffd069d0a5b561 (diff)
Implementation polishing
* http.request does not load posting stream before sending; * one-sigment packets; * move node related code to node module; * Principal capabilities; * batch posting while pushing client offline updates.
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/client/__init__.py10
-rw-r--r--sugar_network/client/auth.py4
-rw-r--r--sugar_network/client/journal.py4
-rw-r--r--sugar_network/client/model.py62
-rw-r--r--sugar_network/client/routes.py113
-rw-r--r--sugar_network/db/__init__.py2
-rw-r--r--sugar_network/db/blobs.py156
-rw-r--r--sugar_network/db/directory.py57
-rw-r--r--sugar_network/db/index.py20
-rw-r--r--sugar_network/db/metadata.py6
-rw-r--r--sugar_network/db/resource.py2
-rw-r--r--sugar_network/db/routes.py119
-rw-r--r--sugar_network/db/storage.py5
-rw-r--r--sugar_network/db/volume.py86
-rw-r--r--sugar_network/model/__init__.py244
-rw-r--r--sugar_network/model/context.py8
-rw-r--r--sugar_network/model/post.py11
-rw-r--r--sugar_network/model/report.py2
-rw-r--r--sugar_network/model/routes.py2
-rw-r--r--sugar_network/node/auth.py49
-rw-r--r--sugar_network/node/master.py21
-rw-r--r--sugar_network/node/model.py521
-rw-r--r--sugar_network/node/routes.py129
-rw-r--r--sugar_network/node/slave.py19
-rw-r--r--sugar_network/toolkit/__init__.py6
-rw-r--r--sugar_network/toolkit/coroutine.py24
-rw-r--r--sugar_network/toolkit/http.py74
-rw-r--r--sugar_network/toolkit/packets.py (renamed from sugar_network/toolkit/parcel.py)298
-rw-r--r--sugar_network/toolkit/router.py93
-rw-r--r--sugar_network/toolkit/spec.py2
30 files changed, 1227 insertions, 922 deletions
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py
index 648d418..b985f90 100644
--- a/sugar_network/client/__init__.py
+++ b/sugar_network/client/__init__.py
@@ -15,7 +15,6 @@
import os
import logging
-from base64 import b64encode
from os.path import join, expanduser, exists
from sugar_network.toolkit import http, Option
@@ -166,10 +165,15 @@ def stability(context):
return value.split()
-def Connection(url=None, **args):
+def Connection(url=None, creds=None, **kwargs):
if url is None:
url = api.value
- return http.Connection(url, verify=not no_check_certificate.value, **args)
+ if creds is None and keyfile.value:
+ from sugar_network.client.auth import SugarCreds
+ creds = SugarCreds(keyfile.value)
+ return http.Connection(url,
+ auth_request={'method': 'GET', 'params': {'cmd': 'logon'}},
+ creds=creds, verify=not no_check_certificate.value, **kwargs)
def IPCConnection():
diff --git a/sugar_network/client/auth.py b/sugar_network/client/auth.py
index db95aa5..c1c86ed 100644
--- a/sugar_network/client/auth.py
+++ b/sugar_network/client/auth.py
@@ -15,11 +15,15 @@
import os
import hashlib
+import logging
from base64 import b64encode
from urllib2 import parse_http_list, parse_keqv_list
from os.path import abspath, expanduser, dirname, exists
+_logger = logging.getLogger('client.auth')
+
+
class BasicCreds(object):
def __init__(self, login, password):
diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py
index 5a6f894..de0fbf8 100644
--- a/sugar_network/client/journal.py
+++ b/sugar_network/client/journal.py
@@ -141,13 +141,13 @@ class Routes(object):
subrequest = Request(method='PUT', document='artifact',
guid=subguid, prop='preview')
subrequest.content_type = 'image/png'
- with file(preview_path, 'rb') as subrequest.content_stream:
+ with file(preview_path, 'rb') as subrequest.content:
self.fallback(subrequest)
subrequest = Request(method='PUT', document='artifact',
guid=subguid, prop='data')
subrequest.content_type = get(guid, 'mime_type') or 'application/octet'
- with file(data_path, 'rb') as subrequest.content_stream:
+ with file(data_path, 'rb') as subrequest.content:
self.fallback(subrequest)
def journal_update(self, guid, data=None, **kwargs):
diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py
index fd85a4d..0c5991f 100644
--- a/sugar_network/client/model.py
+++ b/sugar_network/client/model.py
@@ -20,8 +20,8 @@ from sugar_network.model.user import User
from sugar_network.model.post import Post
from sugar_network.model.report import Report
from sugar_network.model.context import Context as _Context
+from sugar_network.toolkit.router import ACL, File
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit.router import ACL
_logger = logging.getLogger('client.model')
@@ -43,3 +43,63 @@ class Volume(db.Volume):
db.Volume.__init__(self, root, resources)
for directory in self.values():
directory.metadata['author'].acl |= ACL.LOCAL
+
+
+def dump_volume(volume):
+ for resource, directory in volume.items():
+ if not directory.has_seqno:
+ continue
+
+ for doc in directory:
+ if not doc['seqno'] or doc['state'] != 'active':
+ continue
+
+ dump = {}
+ op = dump['op'] = {}
+ props = dump['content'] = {}
+ keys = []
+ postfix = []
+
+ for name, prop in doc.metadata.items():
+ meta = doc.meta(name)
+ if meta is None or 'seqno' not in meta:
+ continue
+ if isinstance(prop, db.Aggregated):
+ for aggid, value in doc.repr(name):
+ aggop = {
+ 'method': 'POST',
+ 'path': [resource, doc.guid, name, aggid],
+ }
+ if isinstance(value, File):
+ value.meta['op'] = aggop
+ postfix.append(value)
+ else:
+ postfix.append({'op': aggop, 'content': value})
+ elif prop.acl & (ACL.WRITE | ACL.CREATE):
+ if isinstance(prop, db.Blob):
+ blob = volume.blobs.get(doc[name])
+ blob.meta['op'] = {
+ 'method': 'PUT',
+ 'path': [resource, doc.guid, name],
+ }
+ postfix.append(blob)
+ else:
+ if isinstance(prop, db.Reference):
+ keys.append(name)
+ props[name] = doc[name]
+
+ if 'seqno' in doc.meta('guid'):
+ keys.append('guid')
+ props['guid'] = doc.guid
+ op['method'] = 'POST'
+ op['path'] = [resource]
+ else:
+ op['method'] = 'PUT'
+ op['path'] = [resource, doc.guid]
+
+ if keys:
+ dump['keys'] = keys
+
+ yield dump
+ for dump in postfix:
+ yield dump
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py
index f618df3..8a037ee 100644
--- a/sugar_network/client/routes.py
+++ b/sugar_network/client/routes.py
@@ -20,11 +20,12 @@ from os.path import join
from sugar_network import db, client, node, toolkit
from sugar_network.model import FrontRoutes
+from sugar_network.client import model
from sugar_network.client.journal import Routes as JournalRoutes
from sugar_network.toolkit.router import Request, Router, Response
from sugar_network.toolkit.router import route, fallbackroute
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel
+from sugar_network.toolkit import netlink, zeroconf, coroutine, http, packets
from sugar_network.toolkit import ranges, lsb_release, enforce
@@ -54,10 +55,6 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
self._no_subscription = no_subscription
self._pull_r = toolkit.Bin(
join(home_volume.root, 'var', 'pull'), [[1, None]])
- self._push_r = toolkit.Bin(
- join(home_volume.root, 'var', 'push'), [[1, None]])
- self._push_guids_map = toolkit.Bin(
- join(home_volume.root, 'var', 'push-guids'), {})
def connect(self, api=None):
if self._connect_jobs:
@@ -123,7 +120,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
result = self.fallback()
result['route'] = 'proxy'
else:
- result = {'roles': [], 'route': 'offline'}
+ result = {'route': 'offline'}
result['guid'] = self._creds.login
return result
@@ -141,7 +138,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
for logfile in logs:
with file(logfile) as f:
self.fallback(method='POST', path=['report', guid, 'logs'],
- content_stream=f, content_type='text/plain')
+ content=f, content_type='text/plain')
yield {'event': 'done', 'guid': guid}
@route('GET', ['context', None], cmd='launch', arguments={'args': list},
@@ -196,7 +193,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
if pins and item['mtime'] > checkin['mtime']:
pull = Request(method='GET',
path=[checkin.metadata.name, checkin.guid], cmd='diff')
- self._sync_jobs.spawn(self._pull_checkin, pull, None, 'range')
+ self._sync_jobs.spawn(self._pull_checkin, pull, None, 'ranges')
return result
@route('GET', [None, None], mime_type='application/json')
@@ -353,7 +350,7 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
_logger.debug('Checkin %r context', local_context.guid)
pull = Request(method='GET',
path=['context', local_context.guid], cmd='diff')
- self._pull_checkin(pull, None, 'range')
+ self._pull_checkin(pull, None, 'ranges')
pins = local_context['pins']
if pin and pin not in pins:
contexts.update(local_context.guid, {'pins': pins + [pin]})
@@ -374,79 +371,69 @@ class ClientRoutes(FrontRoutes, JournalRoutes):
def _pull_checkin(self, request, response, header_key):
request.headers[header_key] = self._pull_r.value
- patch = self.fallback(request, response)
- __, committed = self._local.volume.patch(next(parcel.decode(patch)),
- shift_seqno=False)
- ranges.exclude(self._pull_r.value, committed)
+ packet = packets.decode(self.fallback(request, response))
- def _sync(self):
- _logger.info('Start pulling updates')
+ volume = self._local.volume
+ volume[request.resource].patch(request.guid, packet['patch'])
+ for blob in packet:
+ volume.blobs.patch(blob)
+ ranges.exclude(self._pull_r.value, packet['ranges'])
+
+ def _pull(self):
+ _logger.debug('Start pulling checkin updates')
+ response = Response()
for directory in self._local.volume.values():
if directory.empty:
continue
request = Request(method='GET',
path=[directory.metadata.name], cmd='diff')
- response = Response()
while True:
- request.headers['range'] = self._pull_r.value
- r, guids = self.fallback(request, response)
- if not r:
+ request.headers['ranges'] = self._pull_r.value
+ diff = self.fallback(request, response)
+ if not diff:
break
- for guid in guids:
+ for guid, r in diff.items():
checkin = Request(method='GET',
path=[request.resource, guid], cmd='diff')
- self._pull_checkin(checkin, response, 'range')
- ranges.exclude(self._pull_r.value, r)
- self._pull_r.commit()
- this.localcast({'event': 'sync', 'state': 'pull'})
-
- """
- resource = None
- metadata = None
-
- for diff in self._local.volume.diff(self._push_r.value, blobs=False):
- if 'resource' in diff:
- resource = diff['resource']
- metadata = self._local.volume[resource]
- elif 'commit' in diff:
- ranges.exclude(self._push_r.value, diff['commit'])
- self._push_r.commit()
- # No reasons to keep failure reports after pushing
- self._local.volume['report'].wipe()
- else:
- props = {}
- blobs = []
- for prop, meta in diff['patch'].items():
- if isinstance(metadata[prop], db.Blob):
- blobs.application
-
-
-
- props[prop] = meta['value']
-
-
-
- if isinstance(diff, File):
- with file(diff.path, 'rb') as f:
- self.fallback(method='POST')
+ self._pull_checkin(checkin, response, 'ranges')
+ ranges.exclude(self._pull_r.value, r)
+ def _push(self):
+ volume = self._local.volume
+ _logger.debug('Start pushing offline updates')
+ dump = packets.encode(model.dump_volume(volume))
+ request = Request(method='POST', cmd='apply', content=dump)
+ self.fallback(request, Response())
+ _logger.debug('Wipeout offline updates')
+ for directory in volume.values():
+ if directory.empty:
+ continue
+ if directory.has_noseqno:
+ directory.dilute()
+ else:
+ directory.wipe()
- pass
+ _logger.debug('Wipeout offline blobs')
+ for blob in volume.blobs.walk():
+ if int(blob.meta['x-seqno']):
+ volume.blobs.wipe(blob)
- if 'guid' in props:
- request = Request(method='POST', path=[resource])
- else:
- request = Request(method='PUT', path=[resource, guid])
- request.content_type = 'application/json'
- request.content = props
- self.fallback(request)
- """
+ def _sync(self):
+ try:
+ self._pull()
+ if self._local.volume.has_seqno:
+ self._push()
+ except:
+ this.localcast({'event': 'sync', 'state': 'failed'})
+ raise
+ else:
+ this.localcast({'event': 'sync', 'state': 'done'})
class _LocalRoutes(db.Routes, Router):
diff --git a/sugar_network/db/__init__.py b/sugar_network/db/__init__.py
index d6b12c5..6a6b27c 100644
--- a/sugar_network/db/__init__.py
+++ b/sugar_network/db/__init__.py
@@ -351,7 +351,7 @@ Volume
from sugar_network.db.metadata import \
stored_property, indexed_property, Property, Numeric, Boolean, Dict, \
- Enum, List, Aggregated, Blob, Localized
+ Enum, List, Aggregated, Blob, Localized, Reference
from sugar_network.db.index import index_flush_timeout, \
index_flush_threshold, index_write_queue
from sugar_network.db.resource import Resource
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index ce5bb1b..94e914c 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -43,6 +43,8 @@ class Blobs(object):
def path(self, path=None):
if path is None:
return join(self._root, 'files')
+ if isinstance(path, File):
+ return self._blob_path(path.digest)
if isinstance(path, basestring):
path = path.split(os.sep)
if len(path) == 1 and len(path[0]) == 40 and '.' not in path[0]:
@@ -51,7 +53,47 @@ class Blobs(object):
return join(assets.PATH, *path[1:])
return join(self._root, 'files', *path)
+ def walk(self, path=None, include=None, recursive=True, all_files=False):
+ if path is None:
+ is_files = False
+ root = self._blob_path()
+ else:
+ path = path.strip('/').split('/')
+ enforce(not [i for i in path if i == '..'],
+ http.BadRequest, 'Relative paths are not allowed')
+ is_files = True
+ root = self.path(path)
+
+ for root, __, files in os.walk(root):
+ if include is not None and \
+ not ranges.contains(include, int(os.stat(root).st_mtime)):
+ continue
+ api_path = root[len(self._root) + 7:] if is_files else None
+ for filename in files:
+ if filename.endswith(_META_SUFFIX):
+ if not all_files:
+ digest = filename[:-len(_META_SUFFIX)]
+ path = join(root, digest)
+ yield File(path, digest, _read_meta(path))
+ continue
+ elif not all_files:
+ continue
+ yield root, api_path, filename
+ if not recursive:
+ break
+
def post(self, content, mime_type=None, digest_to_assert=None, meta=None):
+ if isinstance(content, File):
+ seqno = self._seqno.next()
+ meta = content.meta.copy()
+ meta['x-seqno'] = str(seqno)
+ path = self._blob_path(content.digest)
+ if not exists(dirname(path)):
+ os.makedirs(dirname(path))
+ os.link(content.path, path)
+ _write_meta(path, meta, seqno)
+ return File(path, content.digest, meta)
+
if meta is None:
meta = []
meta.append(('content-type',
@@ -94,9 +136,8 @@ class Blobs(object):
seqno = self._seqno.next()
meta.append(('content-length', str(blob.tell())))
meta.append(('x-seqno', str(seqno)))
- _write_meta(path, meta, seqno)
blob.name = path
- os.utime(path, (seqno, seqno))
+ _write_meta(path, meta, seqno)
_logger.debug('Post %r file', path)
@@ -121,75 +162,67 @@ class Blobs(object):
def delete(self, path):
self._delete(self.path(path), None)
+ def wipe(self, path):
+ path = self.path(path)
+ if exists(path + _META_SUFFIX):
+ os.unlink(path + _META_SUFFIX)
+ if exists(path):
+ _logger.debug('Wipe %r file', path)
+ os.unlink(path)
+
def populate(self, path=None, recursive=True):
for __ in self.diff([[1, None]], path or '', recursive):
pass
def diff(self, r, path=None, recursive=True):
- if path is None:
- is_files = False
- root = self._blob_path()
- else:
- path = path.strip('/').split('/')
- enforce(not [i for i in path if i == '..'],
- http.BadRequest, 'Relative paths are not allowed')
- is_files = True
- root = self.path(path)
+ is_files = path is not None
checkin_seqno = None
- for root, __, files in os.walk(root):
- if not ranges.contains(r, int(os.stat(root).st_mtime)):
- continue
- rel_root = root[len(self._root) + 7:] if is_files else None
- for filename in files:
- path = join(root, filename)
- if filename.endswith(_META_SUFFIX):
- seqno = int(os.stat(path).st_mtime)
- path = path[:-len(_META_SUFFIX)]
- meta = None
- if exists(path):
- stat = os.stat(path)
- if seqno != int(stat.st_mtime):
- _logger.debug('Found updated %r blob', path)
- seqno = self._seqno.next()
- meta = _read_meta(path)
- meta['x-seqno'] = str(seqno)
- meta['content-length'] = str(stat.st_size)
- _write_meta(path, meta, seqno)
- os.utime(path, (seqno, seqno))
- if not ranges.contains(r, seqno):
- continue
- if meta is None:
+ for root, rel_root, filename in self.walk(path, r, recursive, True):
+ path = join(root, filename)
+ if filename.endswith(_META_SUFFIX):
+ seqno = int(os.stat(path).st_mtime)
+ path = path[:-len(_META_SUFFIX)]
+ meta = None
+ if exists(path):
+ stat = os.stat(path)
+ if seqno != int(stat.st_mtime):
+ _logger.debug('Found updated %r blob', path)
+ seqno = self._seqno.next()
meta = _read_meta(path)
- if is_files:
- digest = join(rel_root, filename[:-len(_META_SUFFIX)])
- meta['path'] = digest
- else:
- digest = filename[:-len(_META_SUFFIX)]
- elif not is_files or exists(path + _META_SUFFIX):
+ meta['x-seqno'] = str(seqno)
+ meta['content-length'] = str(stat.st_size)
+ _write_meta(path, meta, seqno)
+ if not ranges.contains(r, seqno):
continue
+ if meta is None:
+ meta = _read_meta(path)
+ if is_files:
+ digest = join(rel_root, filename[:-len(_META_SUFFIX)])
+ meta['path'] = digest
else:
- _logger.debug('Found new %r blob', path)
- mime_type = mimetypes.guess_type(filename)[0] or \
- 'application/octet-stream'
- if checkin_seqno is None:
- checkin_seqno = self._seqno.next()
- seqno = checkin_seqno
- meta = [('content-type', mime_type),
- ('content-length', str(os.stat(path).st_size)),
- ('x-seqno', str(seqno)),
- ]
- _write_meta(path, meta, seqno)
- os.utime(path, (seqno, seqno))
- if not ranges.contains(r, seqno):
- continue
- digest = join(rel_root, filename)
- meta.append(('path', digest))
- yield File(path, digest, meta)
- if not recursive:
- break
+ digest = filename[:-len(_META_SUFFIX)]
+ elif not is_files or exists(path + _META_SUFFIX):
+ continue
+ else:
+ _logger.debug('Found new %r blob', path)
+ mime_type = mimetypes.guess_type(filename)[0] or \
+ 'application/octet-stream'
+ if checkin_seqno is None:
+ checkin_seqno = self._seqno.next()
+ seqno = checkin_seqno
+ meta = [('content-type', mime_type),
+ ('content-length', str(os.stat(path).st_size)),
+ ('x-seqno', str(seqno)),
+ ]
+ _write_meta(path, meta, seqno)
+ if not ranges.contains(r, seqno):
+ continue
+ digest = join(rel_root, filename)
+ meta.append(('path', digest))
+ yield File(path, digest, meta)
- def patch(self, patch, seqno):
+ def patch(self, patch, seqno=0):
if 'path' in patch.meta:
path = self.path(patch.meta.pop('path'))
else:
@@ -207,7 +240,6 @@ class Blobs(object):
meta = patch.meta
meta['x-seqno'] = str(seqno)
_write_meta(path, meta, seqno)
- os.utime(path, (seqno, seqno))
def _delete(self, path, seqno):
if exists(path + _META_SUFFIX):
@@ -228,6 +260,8 @@ class Blobs(object):
def _write_meta(path, meta, seqno):
+ if seqno:
+ os.utime(path, (seqno, seqno))
path += _META_SUFFIX
with toolkit.new_file(path) as f:
for key, value in meta.items() if isinstance(meta, dict) else meta:
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py
index 17ff27d..79e7332 100644
--- a/sugar_network/db/directory.py
+++ b/sugar_network/db/directory.py
@@ -27,6 +27,10 @@ from sugar_network.toolkit import enforce
# To invalidate existed index on stcuture changes
_LAYOUT_VERSION = 4
+_STATE_HAS_SEQNO = 1
+_STATE_HAS_NOSEQNO = 2
+
+
_logger = logging.getLogger('db.directory')
@@ -53,12 +57,26 @@ class Directory(object):
self._storage = None
self._index = None
self._broadcast = broadcast
+ self._state = toolkit.Bin(
+ join(root, 'index', self.metadata.name, 'state'), 0)
self._open()
@property
def empty(self):
- return True if self._index is None else (self._index.mtime == 0)
+ return not self._state.value & (_STATE_HAS_SEQNO | _STATE_HAS_NOSEQNO)
+
+ @property
+ def has_seqno(self):
+ return self._state.value & _STATE_HAS_SEQNO
+
+ @property
+ def has_noseqno(self):
+ return self._state.value & _STATE_HAS_NOSEQNO
+
+ def __iter__(self):
+ for guid in self._storage.walk(0):
+ yield self.get(guid)
def wipe(self):
self.close()
@@ -67,8 +85,23 @@ class Directory(object):
ignore_errors=True)
shutil.rmtree(join(self._root, 'db', self.metadata.name),
ignore_errors=True)
+ self._state.value = 0
self._open()
+ def dilute(self):
+ for doc in self:
+ if 'seqno' in doc.record.get('guid'):
+ self._index.delete(doc.guid, self._postdelete, doc.guid, None)
+ continue
+ doc.record.unset('seqno')
+ for prop in self.metadata.keys():
+ meta = doc.record.get(prop)
+ if meta is None or 'seqno' not in meta:
+ continue
+ meta.pop('seqno')
+ doc.record.set(prop, **meta)
+ self._state.value ^= _STATE_HAS_SEQNO
+
def close(self):
"""Flush index write pending queue and close the index."""
if self._index is None:
@@ -158,9 +191,9 @@ class Directory(object):
"""
found = False
- migrate = (self._index.mtime == 0)
+ migrate = self.empty
- for guid in self._storage.walk(self._index.mtime):
+ for guid in self._storage.walk(self._state.mtime):
if not found:
_logger.info('Start populating %r index', self.metadata.name)
found = True
@@ -175,7 +208,7 @@ class Directory(object):
meta = record.get(name)
if meta is not None:
props[name] = meta['value']
- self._index.store(guid, props)
+ self._index.store(guid, props, self._preindex)
yield
except Exception:
_logger.exception('Cannot populate %r in %r, invalidate it',
@@ -195,7 +228,7 @@ class Directory(object):
for doc in docs:
yield doc
- def patch(self, guid, patch, seqno=None):
+ def patch(self, guid, patch, seqno=False):
"""Apply changes for documents."""
doc = self.resource(guid, self._storage.get(guid))
merged = False
@@ -239,6 +272,10 @@ class Directory(object):
doc = self.resource(guid, self._storage.get(guid), changes)
for prop in self.metadata:
enforce(doc[prop] is not None, 'Empty %r property', prop)
+ if changes.get('seqno'):
+ self._state.value |= _STATE_HAS_SEQNO
+ else:
+ self._state.value |= _STATE_HAS_NOSEQNO
return doc
def _prestore(self, guid, changes, event):
@@ -253,15 +290,21 @@ class Directory(object):
return None
for prop in self.metadata.keys():
enforce(doc[prop] is not None, 'Empty %r property', prop)
+ if changes.get('seqno'):
+ self._state.value |= _STATE_HAS_SEQNO
+ else:
+ self._state.value |= _STATE_HAS_NOSEQNO
return doc
def _postdelete(self, guid, event):
self._storage.delete(guid)
- self.broadcast(event)
+ if event:
+ self.broadcast(event)
def _postcommit(self):
self._seqno.commit()
- self.broadcast({'event': 'commit', 'mtime': self._index.mtime})
+ self._state.commit()
+ self.broadcast({'event': 'commit', 'mtime': self._state.mtime})
def _save_layout(self):
path = join(self._root, 'index', self.metadata.name, 'layout')
diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py
index 0270dd4..b46fe1b 100644
--- a/sugar_network/db/index.py
+++ b/sugar_network/db/index.py
@@ -13,12 +13,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import time
import shutil
import logging
-from os.path import exists, join
import xapian
@@ -60,21 +58,12 @@ class IndexReader(object):
self._db = None
self._props = {}
self._path = root
- self._mtime_path = join(self._path, 'mtime')
self._commit_cb = commit_cb
for name, prop in self.metadata.items():
if prop.indexed:
self._props[name] = prop
- @property
- def mtime(self):
- """UNIX seconds of the last `commit()` call."""
- if exists(self._mtime_path):
- return int(os.stat(self._mtime_path).st_mtime)
- else:
- return 0
-
def ensure_open(self):
pass
@@ -418,17 +407,10 @@ class IndexWriter(IndexReader):
self._db.commit()
else:
self._db.flush()
-
- checkpoint = time.time()
- if exists(self._mtime_path):
- os.utime(self._mtime_path, (checkpoint, checkpoint))
- else:
- with file(self._mtime_path, 'w'):
- pass
self._pending_updates = 0
_logger.debug('Commit to %r took %s seconds',
- self.metadata.name, checkpoint - ts)
+ self.metadata.name, time.time() - ts)
if self._commit_cb is not None:
self._commit_cb()
diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py
index e820fc9..d7d9065 100644
--- a/sugar_network/db/metadata.py
+++ b/sugar_network/db/metadata.py
@@ -149,6 +149,8 @@ class Property(object):
enforce(name == 'guid' or prefix != GUID_PREFIX,
'Prefix %r is reserved for internal needs in %r',
GUID_PREFIX, name)
+ enforce(acl ^ ACL.AUTHOR or acl & ACL.AUTH,
+ 'ACL.AUTHOR without ACL.AUTH')
self.setter = None
self.on_get = lambda self, x: x
@@ -202,6 +204,10 @@ class Property(object):
ACL.NAMES[mode], self.name)
+class Reference(Property):
+ pass
+
+
class Boolean(Property):
def typecast(self, value):
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index 2c2e46b..7560024 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -168,7 +168,7 @@ class Resource(object):
def diff(self, r, out_r=None):
patch = {}
for name, prop in self.metadata.items():
- if name == 'seqno' or prop.acl & (ACL.CALC | ACL.LOCAL):
+ if name == 'seqno' or prop.acl & ACL.LOCAL:
continue
meta = self.meta(name)
if meta is None:
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index a1bb75e..0ea1305 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -15,21 +15,17 @@
# pylint: disable-msg=W0611
-import re
import logging
from contextlib import contextmanager
from sugar_network import toolkit
from sugar_network.db.metadata import Aggregated
from sugar_network.toolkit.router import ACL, File
-from sugar_network.toolkit.router import route, postroute, fallbackroute
+from sugar_network.toolkit.router import route, fallbackroute, preroute
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, parcel, ranges, enforce
+from sugar_network.toolkit import http, ranges, enforce
-_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
-_GROUPED_DIFF_LIMIT = 1024
-
_logger = logging.getLogger('db.routes')
@@ -37,18 +33,12 @@ class Routes(object):
def __init__(self, volume, find_limit=None):
this.volume = self.volume = volume
+ this.add_property('resource', _get_resource)
self._find_limit = find_limit
- @postroute
- def postroute(self, result, exception):
- request = this.request
- if not request.guid:
- return result
- pull = request.headers['pull']
- if pull is None:
- return result
- this.response.content_type = 'application/octet-stream'
- return self._object_diff(pull)
+ @preroute
+ def __preroute__(self, op):
+ this.reset_property('resource')
@route('POST', [None], acl=ACL.AUTH, mime_type='application/json')
def create(self):
@@ -71,11 +61,7 @@ class Routes(object):
@route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update_prop(self):
request = this.request
- if request.content is None:
- value = request.content_stream
- else:
- value = request.content
- request.content = {request.prop: value}
+ request.content = {request.prop: request.content}
self.update()
@route('DELETE', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
@@ -133,17 +119,17 @@ class Routes(object):
return self.get_prop()
@route('POST', [None, None, None],
- acl=ACL.AUTH, mime_type='application/json')
+ acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json')
def insert_to_aggprop(self):
return self._aggpost(ACL.INSERT)
@route('PUT', [None, None, None, None],
- acl=ACL.AUTH, mime_type='application/json')
+ acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json')
def update_aggprop(self):
self._aggpost(ACL.REPLACE)
@route('DELETE', [None, None, None, None],
- acl=ACL.AUTH, mime_type='application/json')
+ acl=ACL.AUTH | ACL.AGG_AUTHOR, mime_type='application/json')
def remove_from_aggprop(self):
self._aggpost(ACL.REMOVE)
@@ -180,80 +166,17 @@ class Routes(object):
del authors[user]
directory.update(request.guid, {'author': authors})
- @route('GET', [None], cmd='diff', mime_type='application/json')
- def grouped_diff(self, key):
- if not key:
- key = 'guid'
- in_r = this.request.headers['range'] or [[1, None]]
- out_r = []
- diff = set()
-
- for doc in self.volume[this.request.resource].diff(in_r):
- diff.add(doc.guid)
- if len(diff) > _GROUPED_DIFF_LIMIT:
- break
- ranges.include(out_r, doc['seqno'], doc['seqno'])
- doc.diff(in_r, out_r)
-
- return out_r, list(diff)
-
- @route('GET', [None, None], cmd='diff')
- def object_diff(self):
- return self._object_diff(this.request.headers['range'])
-
@fallbackroute('GET', ['blobs'])
def blobs(self):
return self.volume.blobs.get(this.request.guid)
- def _object_diff(self, in_r):
- request = this.request
- doc = self.volume[request.resource][request.guid]
- enforce(doc.exists, http.NotFound, 'Resource not found')
-
- out_r = []
- if in_r is None:
- in_r = [[1, None]]
- patch = doc.diff(in_r, out_r)
- if not patch:
- return parcel.encode([(None, None, [])], compresslevel=0)
-
- diff = [{'resource': request.resource},
- {'guid': request.guid, 'patch': patch},
- ]
-
- def add_blob(blob):
- if not isinstance(blob, File):
- return
- seqno = int(blob.meta['x-seqno'])
- ranges.include(out_r, seqno, seqno)
- diff.append(blob)
-
- for prop, meta in patch.items():
- prop = doc.metadata[prop]
- value = prop.reprcast(meta['value'])
- if isinstance(prop, Aggregated):
- for __, aggvalue in value:
- add_blob(aggvalue)
- else:
- add_blob(value)
- diff.append({'commit': out_r})
-
- return parcel.encode([(None, None, diff)], compresslevel=0)
-
@contextmanager
def _post(self, access):
content = this.request.content
enforce(isinstance(content, dict), http.BadRequest, 'Invalid value')
if access == ACL.CREATE:
- guid = content.get('guid')
- if guid:
- enforce(this.principal and this.principal.admin,
- http.BadRequest, 'GUID should not be specified')
- enforce(_GUID_RE.match(guid) is not None,
- http.BadRequest, 'Malformed GUID')
- else:
- guid = toolkit.uuid()
+ guid = content.get('guid') or toolkit.uuid()
doc = self.volume[this.request.resource][guid]
enforce(not doc.exists, 'Resource already exists')
doc.posts['guid'] = guid
@@ -261,6 +184,8 @@ class Routes(object):
if name not in content and prop.default is not None:
doc.posts[name] = prop.default
else:
+ enforce('guid' not in content, http.BadRequest,
+ 'GUID in cannot be changed')
doc = self.volume[this.request.resource][this.request.guid]
enforce(doc.available, 'Resource not found')
this.resource = doc
@@ -334,27 +259,16 @@ class Routes(object):
'Property is not aggregated')
prop.assert_access(acl)
- def enforce_authority(author):
- if prop.acl & ACL.AUTHOR:
- author = doc['author']
- enforce(not author or this.principal in author or
- this.principal and this.principal.admin,
- http.Forbidden, 'Authors only')
-
aggid = request.key
if aggid and aggid in doc[request.prop]:
aggvalue = doc[request.prop][aggid]
- enforce_authority(aggvalue.get('author'))
prop.subteardown(aggvalue['value'])
else:
enforce(acl != ACL.REMOVE, http.NotFound, 'No aggregated item')
- enforce_authority(None)
aggvalue = {}
if acl != ACL.REMOVE:
- value = prop.subtypecast(
- request.content_stream if request.content is None
- else request.content)
+ value = prop.subtypecast(request.content)
if type(value) is tuple:
aggid_, value = value
enforce(not aggid or aggid == aggid_, http.BadRequest,
@@ -373,3 +287,8 @@ class Routes(object):
self.volume[request.resource].update(request.guid, doc.posts)
return aggid
+
+
+def _get_resource():
+ request = this.request
+ return this.volume[request.resource][request.guid]
diff --git a/sugar_network/db/storage.py b/sugar_network/db/storage.py
index bbb50db..87d08b3 100644
--- a/sugar_network/db/storage.py
+++ b/sugar_network/db/storage.py
@@ -132,3 +132,8 @@ class Record(object):
# Touch directory to let it possible to crawl it on startup
# when index was not previously closed properly
os.utime(join(self._root, '..'), (mtime, mtime))
+
+ def unset(self, prop):
+ meta_path = join(self._root, prop)
+ if exists(meta_path):
+ os.unlink(meta_path)
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 382176c..25ae1bb 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -15,16 +15,14 @@
import os
import logging
-from copy import deepcopy
from os.path import exists, join, abspath
from sugar_network import toolkit
from sugar_network.db.directory import Directory
from sugar_network.db.index import IndexWriter
from sugar_network.db.blobs import Blobs
-from sugar_network.toolkit.router import File
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, coroutine, ranges, enforce
+from sugar_network.toolkit import http, coroutine, enforce
_logger = logging.getLogger('db.volume')
@@ -70,6 +68,20 @@ class Volume(dict):
return False
return True
+ @property
+ def has_seqno(self):
+ for directory in self.values():
+ if directory.has_seqno:
+ return True
+ return False
+
+ @property
+ def has_noseqno(self):
+ for directory in self.values():
+ if directory.has_noseqno:
+ return True
+ return False
+
def close(self):
"""Close operations with the server."""
_logger.info('Closing documents in %r', self._root)
@@ -83,74 +95,6 @@ class Volume(dict):
for __ in cls.populate():
coroutine.dispatch()
- def diff(self, r, exclude=None, files=None, blobs=True, one_way=False):
- if exclude:
- include = deepcopy(r)
- ranges.exclude(include, exclude)
- else:
- include = r
- last_seqno = None
- found = False
-
- try:
- for resource, directory in self.items():
- if one_way and directory.resource.one_way:
- continue
- yield {'resource': resource}
- for doc in directory.diff(r):
- patch = doc.diff(include)
- if patch:
- yield {'guid': doc.guid, 'patch': patch}
- found = True
- last_seqno = max(last_seqno, doc['seqno'])
- if blobs:
- for blob in self.blobs.diff(include):
- seqno = int(blob.meta.pop('x-seqno'))
- yield blob
- found = True
- last_seqno = max(last_seqno, seqno)
- for dirpath in files or []:
- for blob in self.blobs.diff(include, dirpath):
- seqno = int(blob.meta.pop('x-seqno'))
- yield blob
- found = True
- last_seqno = max(last_seqno, seqno)
- except StopIteration:
- pass
-
- if found:
- commit_r = include if exclude else deepcopy(r)
- ranges.exclude(commit_r, last_seqno + 1, None)
- ranges.exclude(r, None, last_seqno)
- yield {'commit': commit_r}
-
- def patch(self, records, shift_seqno=True):
- directory = None
- committed = []
- seqno = None if shift_seqno else False
-
- for record in records:
- if isinstance(record, File):
- if seqno is None:
- seqno = self.seqno.next()
- self.blobs.patch(record, seqno or 0)
- continue
- resource = record.get('resource')
- if resource:
- directory = self[resource]
- continue
- guid = record.get('guid')
- if guid is not None:
- seqno = directory.patch(guid, record['patch'], seqno)
- continue
- commit = record.get('commit')
- if commit is not None:
- ranges.include(committed, commit)
- continue
- raise http.BadRequest('Malformed patch')
-
- return seqno, committed
-
def broadcast(self, event):
if not self.mute:
if event['event'] == 'commit':
diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py
index 3f6aef1..6a314f5 100644
--- a/sugar_network/model/__init__.py
+++ b/sugar_network/model/__init__.py
@@ -15,22 +15,17 @@
import os
import gettext
-import logging
-import mimetypes
from os.path import join
import xapian
-from sugar_network import toolkit, db
+from sugar_network import db
from sugar_network.model.routes import FrontRoutes
-from sugar_network.toolkit.spec import parse_version, parse_requires
-from sugar_network.toolkit.spec import EMPTY_LICENSE
-from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit.bundle import Bundle
-from sugar_network.toolkit.router import ACL
-from sugar_network.toolkit import i18n, http, svg_to_png, enforce
+ICON_SIZE = 55
+LOGO_SIZE = 140
+
CONTEXT_TYPES = [
'activity', 'group', 'package', 'book',
]
@@ -58,11 +53,6 @@ RESOURCES = (
'sugar_network.model.user',
)
-ICON_SIZE = 55
-LOGO_SIZE = 140
-
-_logger = logging.getLogger('model')
-
class Rating(db.List):
@@ -72,229 +62,3 @@ class Rating(db.List):
def slotting(self, value):
rating = float(value[1]) / value[0] if value[0] else 0
return xapian.sortable_serialise(rating)
-
-
-class Release(object):
-
- def typecast(self, release):
- if this.resource.exists and \
- 'activity' not in this.resource['type'] and \
- 'book' not in this.resource['type']:
- return release
- if not isinstance(release, dict):
- __, release = load_bundle(
- this.volume.blobs.post(release, this.request.content_type),
- context=this.request.guid)
- return release['bundles']['*-*']['blob'], release
-
- def reprcast(self, release):
- return this.volume.blobs.get(release['bundles']['*-*']['blob'])
-
- def teardown(self, release):
- if this.resource.exists and \
- 'activity' not in this.resource['type'] and \
- 'book' not in this.resource['type']:
- return
- for bundle in release['bundles'].values():
- this.volume.blobs.delete(bundle['blob'])
-
- def encode(self, value):
- return []
-
-
-def generate_node_stats(volume):
-
- def calc_rating(**kwargs):
- rating = [0, 0]
- alldocs, __ = volume['post'].find(**kwargs)
- for post in alldocs:
- if post['vote']:
- rating[0] += 1
- rating[1] += post['vote']
- return rating
-
- alldocs, __ = volume['context'].find()
- for context in alldocs:
- rating = calc_rating(type='review', context=context.guid)
- volume['context'].update(context.guid, {'rating': rating})
-
- alldocs, __ = volume['post'].find(topic='')
- for topic in alldocs:
- rating = calc_rating(type='feedback', topic=topic.guid)
- volume['post'].update(topic.guid, {'rating': rating})
-
-
-def load_bundle(blob, context=None, initial=False, extra_deps=None):
- context_type = None
- context_meta = None
- release_notes = None
- release = {}
- version = None
-
- try:
- bundle = Bundle(blob.path, mime_type='application/zip')
- except Exception:
- context_type = 'book'
- if not context:
- context = this.request['context']
- version = this.request['version']
- if 'license' in this.request:
- release['license'] = this.request['license']
- if isinstance(release['license'], basestring):
- release['license'] = [release['license']]
- release['stability'] = 'stable'
- release['bundles'] = {
- '*-*': {
- 'blob': blob.digest,
- },
- }
- else:
- context_type = 'activity'
- unpack_size = 0
-
- with bundle:
- changelog = join(bundle.rootdir, 'CHANGELOG')
- for arcname in bundle.get_names():
- if changelog and arcname == changelog:
- with bundle.extractfile(changelog) as f:
- release_notes = f.read()
- changelog = None
- unpack_size += bundle.getmember(arcname).size
- spec = bundle.get_spec()
- context_meta = _load_context_metadata(bundle, spec)
-
- if not context:
- context = spec['context']
- else:
- enforce(context == spec['context'],
- http.BadRequest, 'Wrong context')
- if extra_deps:
- spec.requires.update(parse_requires(extra_deps))
-
- version = spec['version']
- release['stability'] = spec['stability']
- if spec['license'] is not EMPTY_LICENSE:
- release['license'] = spec['license']
- release['commands'] = spec.commands
- release['requires'] = spec.requires
- release['bundles'] = {
- '*-*': {
- 'blob': blob.digest,
- 'unpack_size': unpack_size,
- },
- }
- blob.meta['content-type'] = 'application/vnd.olpc-sugar'
-
- enforce(context, http.BadRequest, 'Context is not specified')
- enforce(version, http.BadRequest, 'Version is not specified')
- release['version'] = parse_version(version)
-
- doc = this.volume['context'][context]
- if initial and not doc.exists:
- enforce(context_meta, http.BadRequest, 'No way to initate context')
- context_meta['guid'] = context
- context_meta['type'] = [context_type]
- with this.principal as principal:
- principal.admin = True
- this.call(method='POST', path=['context'], content=context_meta,
- principal=principal)
- else:
- enforce(doc.available, http.NotFound, 'No context')
- enforce(context_type in doc['type'],
- http.BadRequest, 'Inappropriate bundle type')
-
- if 'license' not in release:
- releases = doc['releases'].values()
- enforce(releases, http.BadRequest, 'License is not specified')
- recent = max(releases, key=lambda x: x.get('value', {}).get('release'))
- enforce(recent, http.BadRequest, 'License is not specified')
- release['license'] = recent['value']['license']
-
- _logger.debug('Load %r release: %r', context, release)
-
- if this.principal in doc['author']:
- patch = doc.format_patch(context_meta)
- if patch:
- this.call(method='PUT', path=['context', context], content=patch,
- principal=this.principal)
- doc.posts.update(patch)
- # TRANS: Release notes title
- title = i18n._('%(name)s %(version)s release')
- else:
- # TRANS: 3rd party release notes title
- title = i18n._('%(name)s %(version)s third-party release')
- release['announce'] = this.call(method='POST', path=['post'],
- content={
- 'context': context,
- 'type': 'notification',
- 'title': i18n.encode(title,
- name=doc['title'],
- version=version,
- ),
- 'message': release_notes or '',
- },
- content_type='application/json', principal=this.principal)
-
- blob.meta['content-disposition'] = 'attachment; filename="%s-%s%s"' % (
- ''.join(i18n.decode(doc['title']).split()), version,
- mimetypes.guess_extension(blob.meta.get('content-type')) or '',
- )
- this.volume.blobs.update(blob.digest, blob.meta)
-
- return context, release
-
-
-def _load_context_metadata(bundle, spec):
- result = {}
- for prop in ('homepage', 'mime_types'):
- if spec[prop]:
- result[prop] = spec[prop]
- result['guid'] = spec['context']
-
- try:
- from sugar_network.toolkit.sugar import color_svg
-
- icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon']))
- svg = color_svg(icon_file.read(), result['guid'])
- blobs = this.volume.blobs
-
- result['artefact_icon'] = \
- blobs.post(svg, 'image/svg+xml').digest
- result['icon'] = \
- blobs.post(svg_to_png(svg, ICON_SIZE), 'image/png').digest
- result['logo'] = \
- blobs.post(svg_to_png(svg, LOGO_SIZE), 'image/png').digest
-
- icon_file.close()
- except Exception:
- _logger.exception('Failed to load icon')
-
- msgids = {}
- for prop, confname in [
- ('title', 'name'),
- ('summary', 'summary'),
- ('description', 'description'),
- ]:
- if spec[confname]:
- msgids[prop] = spec[confname]
- result[prop] = {'en': spec[confname]}
- with toolkit.mkdtemp() as tmpdir:
- for path in bundle.get_names():
- if not path.endswith('.mo'):
- continue
- mo_path = path.strip(os.sep).split(os.sep)
- if len(mo_path) != 5 or mo_path[1] != 'locale':
- continue
- lang = mo_path[2]
- bundle.extract(path, tmpdir)
- try:
- translation = gettext.translation(spec['context'],
- join(tmpdir, *mo_path[:2]), [lang])
- for prop, value in msgids.items():
- msgstr = translation.gettext(value).decode('utf8')
- if lang == 'en' or msgstr != value:
- result[prop][lang] = msgstr
- except Exception:
- _logger.exception('Gettext failed to read %r', mo_path[-1])
-
- return result
diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py
index 9153552..cf24650 100644
--- a/sugar_network/model/context.py
+++ b/sugar_network/model/context.py
@@ -95,17 +95,17 @@ class Context(db.Resource):
def previews(self, value):
return value
- @db.stored_property(db.Aggregated, subtype=model.Release(),
- acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE)
+ @db.stored_property(db.Aggregated, subtype=db.Dict(),
+ acl=ACL.READ | ACL.LOCAL)
def releases(self, value):
return value
@db.indexed_property(db.Numeric, slot=2, default=0,
- acl=ACL.READ | ACL.CALC)
+ acl=ACL.READ | ACL.LOCAL)
def downloads(self, value):
return value
- @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.CALC)
+ @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.LOCAL)
def rating(self, value):
return value
diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py
index d924617..e0b3b25 100644
--- a/sugar_network/model/post.py
+++ b/sugar_network/model/post.py
@@ -20,11 +20,12 @@ from sugar_network.toolkit.coroutine import this
class Post(db.Resource):
- @db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ)
+ @db.indexed_property(db.Reference, prefix='C', acl=ACL.CREATE | ACL.READ)
def context(self, value):
return value
- @db.indexed_property(prefix='A', default='', acl=ACL.CREATE | ACL.READ)
+ @db.indexed_property(db.Reference, prefix='A', default='',
+ acl=ACL.CREATE | ACL.READ)
def topic(self, value):
return value
@@ -42,7 +43,7 @@ class Post(db.Resource):
def message(self, value):
return value
- @db.indexed_property(prefix='R', default='')
+ @db.indexed_property(db.Reference, prefix='R', default='')
def solution(self, value):
return value
@@ -82,10 +83,10 @@ class Post(db.Resource):
return value
@db.indexed_property(db.Numeric, slot=2, default=0,
- acl=ACL.READ | ACL.CALC)
+ acl=ACL.READ | ACL.LOCAL)
def downloads(self, value):
return value
- @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.CALC)
+ @db.indexed_property(model.Rating, slot=3, acl=ACL.READ | ACL.LOCAL)
def rating(self, value):
return value
diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py
index a434a6d..4f201d5 100644
--- a/sugar_network/model/report.py
+++ b/sugar_network/model/report.py
@@ -34,7 +34,7 @@ class Report(db.Resource):
one_way = True
- @db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ)
+ @db.indexed_property(db.Reference, prefix='C', acl=ACL.CREATE | ACL.READ)
def context(self, value):
return value
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index 8012853..63c98b1 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -58,7 +58,7 @@ class FrontRoutes(object):
# initiate a subscription and do not stuck in waiting for the 1st event
yield {'event': 'pong'}
- subscription = this.request.content_stream
+ subscription = this.request.content
if subscription is not None:
coroutine.spawn(self._wait_for_closing, subscription)
diff --git a/sugar_network/node/auth.py b/sugar_network/node/auth.py
index 00054f5..d14bde6 100644
--- a/sugar_network/node/auth.py
+++ b/sugar_network/node/auth.py
@@ -40,19 +40,46 @@ class Unauthorized(http.Unauthorized):
class Principal(str):
- admin = False
- editor = False
- translator = False
+ def __new__(cls, value, caps=0):
+ if not isinstance(value, basestring):
+ value, caps = value
+ self = str.__new__(cls, value)
+ # pylint: disable-msg=W0212
+ self._caps = caps
+ self._backup = 0
+ return self
+
+ @property
+ def cap_author_override(self):
+ return self._caps & 1
+
+ @cap_author_override.setter
+ def cap_author_override(self, value):
+ if value:
+ self._caps |= 1
+ else:
+ self._caps ^= 1
- _backup = None
+ @property
+ def cap_create_with_guid(self):
+ return self._caps & 1
+
+ @cap_create_with_guid.setter
+ def cap_create_with_guid(self, value):
+ if value:
+ self._caps |= 1
+ else:
+ self._caps ^= 1
def __enter__(self):
- self._backup = (self.admin, self.editor, self.translator)
+ self._backup = self._caps
return self
def __exit__(self, exc_type, exc_value, traceback):
- self.admin, self.editor, self.translator = self._backup
- self._backup = None
+ self._caps = self._backup
+
+ def dump(self):
+ return self, self._caps
class SugarAuth(object):
@@ -109,10 +136,8 @@ class SugarAuth(object):
for role in self._config.get('permissions', user).split():
role = role.lower()
if role == 'admin':
- principal.admin = True
- elif role == 'editor':
- principal.editor = True
- elif role == 'translator':
- principal.translator = True
+ principal.cap_author_override = True
+ principal.cap_create_with_guid = True
+ # TODO
return principal
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index c5b15e6..c94d047 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -19,15 +19,14 @@ from urlparse import urlsplit
from sugar_network import toolkit
from sugar_network.model.post import Post
from sugar_network.model.report import Report
-from sugar_network.node.model import User, Context
-from sugar_network.node import obs, master_api
+from sugar_network.node import obs, master_api, model
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, parcel, pylru, ranges, enforce
+from sugar_network.toolkit import http, packets, pylru, ranges, enforce
-RESOURCES = (User, Context, Post, Report)
+RESOURCES = (model.User, model.Context, Post, Report)
_logger = logging.getLogger('node.master')
@@ -40,20 +39,20 @@ class MasterRoutes(NodeRoutes):
@route('POST', cmd='sync', arguments={'accept_length': int})
def sync(self, accept_length):
- return parcel.encode(self._push() + (self._pull() or []),
+ return packets.encode(self._push() + (self._pull() or []),
limit=accept_length, header={'from': self.guid},
on_complete=this.cookie.clear)
@route('POST', cmd='push')
def push(self):
- return parcel.encode(self._push(), header={'from': self.guid})
+ return packets.encode(self._push(), header={'from': self.guid})
@route('GET', cmd='pull', arguments={'accept_length': int})
def pull(self, accept_length):
reply = self._pull()
if reply is None:
return None
- return parcel.encode(reply, limit=accept_length,
+ return packets.encode(reply, limit=accept_length,
header={'from': self.guid}, on_complete=this.cookie.clear)
@route('PUT', ['context', None], cmd='presolve',
@@ -72,13 +71,13 @@ class MasterRoutes(NodeRoutes):
cookie = this.cookie
reply = []
- for packet in parcel.decode(
- this.request.content_stream, this.request.content_length):
+ for packet in packets.decode(
+ this.request.content, this.request.content_length):
sender = packet['from']
enforce(packet['to'] == self.guid, http.BadRequest,
'Misaddressed packet')
if packet.name == 'push':
- seqno, push_r = this.volume.patch(packet)
+ seqno, push_r = model.patch_volume(packet)
ack_r = [] if seqno is None else [[seqno, seqno]]
ack = {'ack': ack_r, 'ranges': push_r, 'to': sender}
reply.append(('ack', ack, None))
@@ -129,7 +128,7 @@ class MasterRoutes(NodeRoutes):
r = reduce(lambda x, y: ranges.intersect(x, y), acked.values())
ranges.include(exclude, r)
- push = this.volume.diff(pull_r, exclude, one_way=True, files=[''])
+ push = model.diff_volume(pull_r, exclude, one_way=True, files=[''])
reply.append(('push', None, push))
return reply
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index 144dab0..f178913 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -13,19 +13,32 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import json
import bisect
import hashlib
import logging
+import gettext
+import mimetypes
+from copy import deepcopy
from os.path import join
from sugar_network import db, toolkit
-from sugar_network.model import Release, context as _context, user as _user
+from sugar_network.model import context as _context, user as _user
+from sugar_network.model import ICON_SIZE, LOGO_SIZE
from sugar_network.node import obs
-from sugar_network.toolkit.router import ACL
-from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import spec, sat, http, coroutine, i18n, enforce
+from sugar_network.node.auth import Principal
+from sugar_network.toolkit.router import ACL, File, Request, Response
+from sugar_network.toolkit.coroutine import Queue, this
+from sugar_network.toolkit.spec import EMPTY_LICENSE, ensure_version
+from sugar_network.toolkit.spec import parse_requires, parse_version
+from sugar_network.toolkit.bundle import Bundle
+from sugar_network.toolkit import sat, http, i18n, ranges, packets
+from sugar_network.toolkit import svg_to_png, enforce
+BATCH_SUFFIX = '.meta'
+
_logger = logging.getLogger('node.model')
_presolve_queue = None
@@ -36,73 +49,42 @@ class User(_user.User):
self.posts['guid'] = str(hashlib.sha1(self['pubkey']).hexdigest())
-class _Release(Release):
+class _ReleaseValue(dict):
- _package_cast = db.Dict(db.List())
+ guid = None
- def typecast(self, value):
- if not this.resource.exists or 'package' not in this.resource['type']:
- return Release.typecast(self, value)
-
- value = self._package_cast.typecast(value)
- enforce(value.get('binary'), http.BadRequest, 'No binary aliases')
-
- distro = this.request.key
- if distro == '*':
- lsb_id = None
- lsb_release = None
- elif '-' in this.request.key:
- lsb_id, lsb_release = distro.split('-', 1)
- else:
- lsb_id = distro
- lsb_release = None
- releases = this.resource.record.get('releases')
- resolves = releases['value'].setdefault('resolves', {})
- to_presolve = []
-
- for repo in obs.get_repos():
- if lsb_id and lsb_id != repo['lsb_id'] or \
- lsb_release and lsb_release != repo['lsb_release']:
- continue
- # Make sure there are no alias overrides
- if not lsb_id and repo['lsb_id'] in releases['value'] or \
- not lsb_release and repo['name'] in releases['value']:
- continue
- pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], [])
- version = None
- try:
- for arch in repo['arches']:
- version = obs.resolve(repo['name'], arch, pkgs)['version']
- except Exception, error:
- _logger.warning('Failed to resolve %r on %s',
- pkgs, repo['name'])
- resolve = {'status': str(error)}
- else:
- to_presolve.append((repo['name'], pkgs))
- resolve = {
- 'version': spec.parse_version(version),
- 'packages': pkgs,
- 'status': 'success',
- }
- resolves.setdefault(repo['name'], {}).update(resolve)
- if to_presolve and _presolve_queue is not None:
- _presolve_queue.put(to_presolve)
- if resolves:
- this.resource.record.set('releases', **releases)
+class _Release(object):
- return value
+ _package_subcast = db.Dict(db.List())
+
+ def typecast(self, value):
+ if isinstance(value, _ReleaseValue):
+ return value.guid, value
+ doc = this.volume['context'][this.request.guid]
+ if 'package' in doc['type']:
+ value = _ReleaseValue(self._package_subcast.typecast(value))
+ value.guid = this.request.key
+ _resolve_package_alias(doc, value)
+ return value
+ bundle = this.volume.blobs.post(value, this.request.content_type)
+ __, value = load_bundle(bundle, context=this.request.guid)
+ return value.guid, value
+
+ def encode(self, value):
+ return []
def teardown(self, value):
- if 'package' not in this.resource['type']:
- return Release.teardown(self, value)
+ if 'bundles' in value:
+ for bundle in value['bundles'].values():
+ this.volume.blobs.delete(bundle['blob'])
# TODO Delete presolved files
class Context(_context.Context):
@db.stored_property(db.Aggregated, subtype=_Release(),
- acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE)
+ acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE | ACL.LOCAL)
def releases(self, value):
return value
@@ -135,6 +117,168 @@ class Volume(db.Volume):
self.release_seqno.commit()
+def diff_volume(r, exclude=None, files=None, blobs=True, one_way=False):
+ volume = this.volume
+ if exclude:
+ include = deepcopy(r)
+ ranges.exclude(include, exclude)
+ else:
+ include = r
+ last_seqno = None
+ found = False
+
+ try:
+ for resource, directory in volume.items():
+ if one_way and directory.resource.one_way:
+ continue
+ yield {'resource': resource}
+ for doc in directory.diff(r):
+ patch = doc.diff(include)
+ if patch:
+ yield {'guid': doc.guid, 'patch': patch}
+ found = True
+ last_seqno = max(last_seqno, doc['seqno'])
+ if blobs:
+ for blob in volume.blobs.diff(include):
+ seqno = int(blob.meta.pop('x-seqno'))
+ yield blob
+ found = True
+ last_seqno = max(last_seqno, seqno)
+ for dirpath in files or []:
+ for blob in volume.blobs.diff(include, dirpath):
+ seqno = int(blob.meta.pop('x-seqno'))
+ yield blob
+ found = True
+ last_seqno = max(last_seqno, seqno)
+ except StopIteration:
+ pass
+
+ if found:
+ commit_r = include if exclude else deepcopy(r)
+ ranges.exclude(commit_r, last_seqno + 1, None)
+ ranges.exclude(r, None, last_seqno)
+ yield {'commit': commit_r}
+
+
+def patch_volume(records, shift_seqno=True):
+ volume = this.volume
+ directory = None
+ committed = []
+ seqno = None if shift_seqno else False
+
+ for record in records:
+ if isinstance(record, File):
+ if seqno is None:
+ seqno = volume.seqno.next()
+ volume.blobs.patch(record, seqno or 0)
+ continue
+ resource = record.get('resource')
+ if resource:
+ directory = volume[resource]
+ continue
+ guid = record.get('guid')
+ if guid is not None:
+ seqno = directory.patch(guid, record['patch'], seqno)
+ continue
+ commit = record.get('commit')
+ if commit is not None:
+ ranges.include(committed, commit)
+ continue
+ raise http.BadRequest('Malformed patch')
+
+ return seqno, committed
+
+
+def diff_resource(in_r):
+ request = this.request
+ enforce(request.resource != 'user', http.BadRequest,
+ 'Not allowed for User resource')
+ doc = this.volume[request.resource][request.guid]
+ enforce(doc.exists, http.NotFound, 'Resource not found')
+
+ out_r = []
+ if in_r is None:
+ in_r = [[1, None]]
+ patch = doc.diff(in_r, out_r)
+ if not patch:
+ return packets.encode([], compresslevel=0)
+ blobs = []
+
+ def add_blob(blob):
+ if not isinstance(blob, File):
+ return
+ seqno = int(blob.meta['x-seqno'])
+ ranges.include(out_r, seqno, seqno)
+ blobs.append(blob)
+
+ for prop, meta in patch.items():
+ prop = doc.metadata[prop]
+ value = prop.reprcast(meta['value'])
+ if isinstance(prop, db.Aggregated):
+ for __, aggvalue in value:
+ add_blob(aggvalue)
+ else:
+ add_blob(value)
+
+ return packets.encode(blobs, patch=patch, ranges=out_r, compresslevel=0)
+
+
+def apply_batch(path):
+ with file(path + BATCH_SUFFIX) as f:
+ meta = json.load(f)
+ principal = Principal(meta['principal'])
+ principal.cap_create_with_guid = True
+ only_nums = meta.get('failed')
+ guid_map = meta.setdefault('guid_map', {})
+ failed = meta['failed'] = []
+ volume = this.volume
+
+ def map_guid(remote_guid):
+ local_guid = guid_map.get(remote_guid)
+ if not local_guid:
+ if volume[request.resource][remote_guid].exists:
+ return remote_guid
+ local_guid = guid_map[remote_guid] = toolkit.uuid()
+ return local_guid
+
+ with file(path, 'rb') as batch:
+ num = 0
+ for record in packets.decode(batch):
+ num += 1
+ if only_nums and not ranges.contains(only_nums, num):
+ continue
+ if isinstance(record, File):
+ request = Request(**record.meta.pop('op'))
+ request.content = record
+ else:
+ request = Request(**record['op'])
+ props = record['content']
+ keys = record.get('keys') or []
+ enforce('guid' not in props or 'guid' in keys,
+ http.BadRequest, 'Guid values is not mapped')
+ for key in keys:
+ enforce(key in props, http.BadRequest,
+ 'No mapped property value')
+ props[key] = map_guid(props[key])
+ request.content = props
+ if request.guid and \
+ not volume[request.resource][request.guid].exists:
+ request.guid = map_guid(request.guid)
+ request.principal = principal
+ try:
+ this.call(request, Response())
+ except Exception:
+ _logger.exception('Failed to apply %r', request)
+ ranges.include(failed, num, num)
+
+ if failed:
+ with toolkit.new_file(path + BATCH_SUFFIX) as f:
+ json.dump(meta, f)
+ else:
+ os.unlink(path + BATCH_SUFFIX)
+ os.unlink(path)
+
+
def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
stability=None, requires=None):
top_context = volume['context'][top_context]
@@ -145,12 +289,12 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
top_cond = []
top_requires = {}
if isinstance(requires, basestring):
- top_requires.update(spec.parse_requires(requires))
+ top_requires.update(parse_requires(requires))
elif requires:
for i in requires:
- top_requires.update(spec.parse_requires(i))
+ top_requires.update(parse_requires(i))
if top_context['dependencies']:
- top_requires.update(spec.parse_requires(top_context['dependencies']))
+ top_requires.update(parse_requires(top_context['dependencies']))
if top_context.guid in top_requires:
top_cond = top_requires.pop(top_context.guid)
@@ -173,7 +317,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
for dep, cond in deps.items():
dep_clause = [-v_usage]
for v_release in add_context(dep):
- if spec.ensure(varset[v_release][1]['version'], cond):
+ if ensure_version(varset[v_release][1]['version'], cond):
dep_clause.append(v_release)
clauses.append(dep_clause)
@@ -211,7 +355,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
release = release['value']
if release['stability'] not in stability or \
context.guid == top_context.guid and \
- not spec.ensure(release['version'], top_cond):
+ not ensure_version(release['version'], top_cond):
continue
bisect.insort(candidates, rate_release(digest, release))
for release in reversed(candidates):
@@ -272,12 +416,259 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
def presolve(presolve_path):
global _presolve_queue
- _presolve_queue = coroutine.Queue()
+ _presolve_queue = Queue()
for repo_name, pkgs in _presolve_queue:
obs.presolve(repo_name, pkgs, presolve_path)
+def load_bundle(blob, context=None, initial=False, extra_deps=None):
+ context_type = None
+ context_meta = None
+ release_notes = None
+ version = None
+ release = _ReleaseValue()
+ release.guid = blob.digest
+
+ try:
+ bundle = Bundle(blob.path, mime_type='application/zip')
+ except Exception:
+ context_type = 'book'
+ if not context:
+ context = this.request['context']
+ version = this.request['version']
+ if 'license' in this.request:
+ release['license'] = this.request['license']
+ if isinstance(release['license'], basestring):
+ release['license'] = [release['license']]
+ release['stability'] = 'stable'
+ release['bundles'] = {
+ '*-*': {
+ 'blob': blob.digest,
+ },
+ }
+ else:
+ context_type = 'activity'
+ unpack_size = 0
+
+ with bundle:
+ changelog = join(bundle.rootdir, 'CHANGELOG')
+ for arcname in bundle.get_names():
+ if changelog and arcname == changelog:
+ with bundle.extractfile(changelog) as f:
+ release_notes = f.read()
+ changelog = None
+ unpack_size += bundle.getmember(arcname).size
+ spec = bundle.get_spec()
+ context_meta = _load_context_metadata(bundle, spec)
+
+ if not context:
+ context = spec['context']
+ else:
+ enforce(context == spec['context'],
+ http.BadRequest, 'Wrong context')
+ if extra_deps:
+ spec.requires.update(parse_requires(extra_deps))
+
+ version = spec['version']
+ release['stability'] = spec['stability']
+ if spec['license'] is not EMPTY_LICENSE:
+ release['license'] = spec['license']
+ release['commands'] = spec.commands
+ release['requires'] = spec.requires
+ release['bundles'] = {
+ '*-*': {
+ 'blob': blob.digest,
+ 'unpack_size': unpack_size,
+ },
+ }
+ blob.meta['content-type'] = 'application/vnd.olpc-sugar'
+
+ enforce(context, http.BadRequest, 'Context is not specified')
+ enforce(version, http.BadRequest, 'Version is not specified')
+ release['version'] = parse_version(version)
+
+ doc = this.volume['context'][context]
+ if initial and not doc.exists:
+ enforce(context_meta, http.BadRequest, 'No way to initate context')
+ context_meta['guid'] = context
+ context_meta['type'] = [context_type]
+ with this.principal as principal:
+ principal.cap_create_with_guid = True
+ this.call(method='POST', path=['context'], content=context_meta,
+ principal=principal)
+ else:
+ enforce(doc.available, http.NotFound, 'No context')
+ enforce(context_type in doc['type'],
+ http.BadRequest, 'Inappropriate bundle type')
+
+ if 'license' not in release:
+ releases = doc['releases'].values()
+ enforce(releases, http.BadRequest, 'License is not specified')
+ recent = max(releases, key=lambda x: x.get('value', {}).get('release'))
+ enforce(recent, http.BadRequest, 'License is not specified')
+ release['license'] = recent['value']['license']
+
+ _logger.debug('Load %r release: %r', context, release)
+
+ if this.principal in doc['author']:
+ patch = doc.format_patch(context_meta)
+ if patch:
+ this.call(method='PUT', path=['context', context], content=patch,
+ principal=this.principal)
+ doc.posts.update(patch)
+ # TRANS: Release notes title
+ title = i18n._('%(name)s %(version)s release')
+ else:
+ # TRANS: 3rd party release notes title
+ title = i18n._('%(name)s %(version)s third-party release')
+ release['announce'] = this.call(method='POST', path=['post'],
+ content={
+ 'context': context,
+ 'type': 'notification',
+ 'title': i18n.encode(title,
+ name=doc['title'],
+ version=version,
+ ),
+ 'message': release_notes or '',
+ },
+ content_type='application/json', principal=this.principal)
+
+ blob.meta['content-disposition'] = 'attachment; filename="%s-%s%s"' % (
+ ''.join(i18n.decode(doc['title']).split()), version,
+ mimetypes.guess_extension(blob.meta.get('content-type')) or '',
+ )
+ this.volume.blobs.update(blob.digest, blob.meta)
+
+ return context, release
+
+
+def generate_node_stats(volume):
+
+ def calc_rating(**kwargs):
+ rating = [0, 0]
+ alldocs, __ = volume['post'].find(**kwargs)
+ for post in alldocs:
+ if post['vote']:
+ rating[0] += 1
+ rating[1] += post['vote']
+ return rating
+
+ alldocs, __ = volume['context'].find()
+ for context in alldocs:
+ rating = calc_rating(type='review', context=context.guid)
+ volume['context'].update(context.guid, {'rating': rating})
+
+ alldocs, __ = volume['post'].find(topic='')
+ for topic in alldocs:
+ rating = calc_rating(type='feedback', topic=topic.guid)
+ volume['post'].update(topic.guid, {'rating': rating})
+
+
+def _load_context_metadata(bundle, spec):
+ result = {}
+ for prop in ('homepage', 'mime_types'):
+ if spec[prop]:
+ result[prop] = spec[prop]
+ result['guid'] = spec['context']
+
+ try:
+ from sugar_network.toolkit.sugar import color_svg
+
+ icon_file = bundle.extractfile(join(bundle.rootdir, spec['icon']))
+ svg = color_svg(icon_file.read(), result['guid'])
+ blobs = this.volume.blobs
+
+ result['artefact_icon'] = \
+ blobs.post(svg, 'image/svg+xml').digest
+ result['icon'] = \
+ blobs.post(svg_to_png(svg, ICON_SIZE), 'image/png').digest
+ result['logo'] = \
+ blobs.post(svg_to_png(svg, LOGO_SIZE), 'image/png').digest
+
+ icon_file.close()
+ except Exception:
+ _logger.exception('Failed to load icon')
+
+ msgids = {}
+ for prop, confname in [
+ ('title', 'name'),
+ ('summary', 'summary'),
+ ('description', 'description'),
+ ]:
+ if spec[confname]:
+ msgids[prop] = spec[confname]
+ result[prop] = {'en': spec[confname]}
+ with toolkit.mkdtemp() as tmpdir:
+ for path in bundle.get_names():
+ if not path.endswith('.mo'):
+ continue
+ mo_path = path.strip(os.sep).split(os.sep)
+ if len(mo_path) != 5 or mo_path[1] != 'locale':
+ continue
+ lang = mo_path[2]
+ bundle.extract(path, tmpdir)
+ try:
+ translation = gettext.translation(spec['context'],
+ join(tmpdir, *mo_path[:2]), [lang])
+ for prop, value in msgids.items():
+ msgstr = translation.gettext(value).decode('utf8')
+ if lang == 'en' or msgstr != value:
+ result[prop][lang] = msgstr
+ except Exception:
+ _logger.exception('Gettext failed to read %r', mo_path[-1])
+
+ return result
+
+
+def _resolve_package_alias(doc, value):
+ enforce(value.get('binary'), http.BadRequest, 'No binary aliases')
+
+ distro = this.request.key
+ enforce(distro, http.BadRequest, 'No distro in path')
+ if distro == '*':
+ lsb_id = None
+ lsb_release = None
+ elif '-' in this.request.key:
+ lsb_id, lsb_release = distro.split('-', 1)
+ else:
+ lsb_id = distro
+ lsb_release = None
+ releases = doc['releases']
+ resolves = releases.get('resolves') or {}
+ to_presolve = []
+
+ for repo in obs.get_repos():
+ if lsb_id and lsb_id != repo['lsb_id'] or \
+ lsb_release and lsb_release != repo['lsb_release']:
+ continue
+ # Make sure there are no alias overrides
+ if not lsb_id and repo['lsb_id'] in releases or \
+ not lsb_release and repo['name'] in releases:
+ continue
+ pkgs = sum([value.get(i, []) for i in ('binary', 'devel')], [])
+ version = None
+ try:
+ for arch in repo['arches']:
+ version = obs.resolve(repo['name'], arch, pkgs)['version']
+ except Exception, error:
+ _logger.warning('Failed to resolve %r on %s',
+ pkgs, repo['name'])
+ resolve = {'status': str(error)}
+ else:
+ to_presolve.append((repo['name'], pkgs))
+ resolve = {
+ 'version': parse_version(version),
+ 'packages': pkgs,
+ 'status': 'success',
+ }
+ resolves.setdefault(repo['name'], {}).update(resolve)
+
+ if to_presolve and _presolve_queue is not None:
+ _presolve_queue.put(to_presolve)
+ doc.post('releases', {'resolves': resolves})
+
+
_STABILITY_RATES = {
'insecure': 0,
'buggy': 1,
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index ac8a840..ee28e89 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -15,20 +15,28 @@
# pylint: disable-msg=W0611
+import os
+import re
+import json
+import time
+import shutil
import logging
-from os.path import join
+from os.path import join, exists
-from sugar_network import db
-from sugar_network.model import FrontRoutes, load_bundle
+from sugar_network import db, toolkit
+from sugar_network.model import FrontRoutes
from sugar_network.node import model
-from sugar_network.toolkit.router import ACL, File
-from sugar_network.toolkit.router import route, fallbackroute, preroute
+from sugar_network.toolkit.router import ACL, File, Request, Response, route
+from sugar_network.toolkit.router import fallbackroute, preroute, postroute
from sugar_network.toolkit.spec import parse_requires, parse_version
from sugar_network.toolkit.bundle import Bundle
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, coroutine, enforce
+from sugar_network.toolkit import http, coroutine, ranges, packets, enforce
+_GROUPED_DIFF_LIMIT = 1024
+_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
+
_logger = logging.getLogger('node.routes')
@@ -39,6 +47,10 @@ class NodeRoutes(db.Routes, FrontRoutes):
FrontRoutes.__init__(self)
self._guid = guid
self._auth = auth
+ self._batch_dir = join(self.volume.root, 'batch')
+
+ if not exists(self._batch_dir):
+ os.makedirs(self._batch_dir)
@property
def guid(self):
@@ -47,35 +59,49 @@ class NodeRoutes(db.Routes, FrontRoutes):
@preroute
def preroute(self, op):
request = this.request
+
if request.principal:
this.principal = request.principal
elif op.acl & ACL.AUTH:
this.principal = self._auth.logon(request)
else:
this.principal = None
- if op.acl & ACL.AUTHOR and request.guid:
- if not this.principal:
- this.principal = self._auth.logon(request)
- allowed = this.principal.admin
- if not allowed:
- if request.resource == 'user':
- allowed = (this.principal == request.guid)
- else:
- doc = self.volume[request.resource].get(request.guid)
- allowed = this.principal in doc['author']
+
+ if op.acl & ACL.AUTHOR and not this.principal.cap_author_override:
+ if request.resource == 'user':
+ allowed = (this.principal == request.guid)
+ else:
+ allowed = this.principal in this.resource['author']
enforce(allowed, http.Forbidden, 'Authors only')
- if op.acl & ACL.SUPERUSER:
- if not this.principal:
- this.principal = self._auth.logon(request)
- enforce(this.principal.admin, http.Forbidden, 'Superusers only')
+
+ if op.acl & ACL.AGG_AUTHOR and not this.principal.cap_author_override:
+ if this.resource.metadata[request.prop].acl & ACL.AUTHOR:
+ allowed = this.principal in this.resource['author']
+ elif request.key:
+ value = this.resource[request.prop].get(request.key)
+ allowed = value is None or this.principal in value['author']
+ else:
+ allowed = True
+ enforce(allowed, http.Forbidden, 'Authors only')
+
+ @postroute
+ def postroute(self, result, exception):
+ request = this.request
+ if not request.guid:
+ return result
+ pull = request.headers['pull']
+ if pull is None:
+ return result
+ this.response.content_type = 'application/octet-stream'
+ return model.diff_resource(pull)
+
+ @route('GET', cmd='logon', acl=ACL.AUTH)
+ def logon(self):
+ pass
@route('GET', cmd='whoami', mime_type='application/json')
def whoami(self):
- roles = []
- if this.principal and this.principal.admin:
- roles.append('root')
- return {'roles': roles,
- 'guid': this.principal,
+ return {'guid': this.principal,
'route': 'direct',
}
@@ -123,9 +149,9 @@ class NodeRoutes(db.Routes, FrontRoutes):
mime_type='application/json', acl=ACL.AUTH)
def submit_release(self, initial):
blob = self.volume.blobs.post(
- this.request.content_stream, this.request.content_type)
+ this.request.content, this.request.content_type)
try:
- context, release = load_bundle(blob, initial=initial)
+ context, release = model.load_bundle(blob, initial=initial)
except Exception:
self.volume.blobs.delete(blob.digest)
raise
@@ -147,5 +173,54 @@ class NodeRoutes(db.Routes, FrontRoutes):
solution = self.solve()
return self.volume.blobs.get(solution[this.request.guid]['blob'])
+ @route('GET', [None, None], cmd='diff')
+ def diff_resource(self):
+ return model.diff_resource(this.request.headers['ranges'])
+
+ @route('GET', [None], cmd='diff', mime_type='application/json')
+ def grouped_diff(self, key):
+ request = this.request
+ enforce(request.resource != 'user', http.BadRequest,
+ 'Not allowed for User resource')
+
+ if not key:
+ key = 'guid'
+ in_r = request.headers['ranges'] or [[1, None]]
+ diff = {}
+
+ for doc in self.volume[request.resource].diff(in_r):
+ out_r = diff.get(doc[key])
+ if out_r is None:
+ if len(diff) >= _GROUPED_DIFF_LIMIT:
+ break
+ out_r = diff[doc[key]] = []
+ ranges.include(out_r, doc['seqno'], doc['seqno'])
+ doc.diff(in_r, out_r)
+
+ return diff
+
+ @route('POST', cmd='apply', acl=ACL.AUTH)
+ def batched_post(self):
+ with toolkit.NamedTemporaryFile(dir=self._batch_dir,
+ prefix=this.principal, delete=False) as batch:
+ try:
+ shutil.copyfileobj(this.request.content, batch)
+ except Exception:
+ os.unlink(batch.name)
+ raise
+ with file(batch.name + '.meta', 'w') as f:
+ json.dump({'principal': this.principal.dump()}, f)
+ coroutine.spawn(model.apply_batch, batch.name)
+
+ def create(self):
+ if this.principal and this.principal.cap_create_with_guid:
+ guid = this.request.content.get('guid')
+ enforce(not guid or _GUID_RE.match(guid), http.BadRequest,
+ 'Malformed GUID')
+ else:
+ enforce('guid' not in this.request.content, http.BadRequest,
+ 'GUID should not be specified')
+ return db.Routes.create(self)
+
this.principal = None
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 074ae79..176defd 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -25,15 +25,14 @@ from sugar_network import toolkit
from sugar_network.model.context import Context
from sugar_network.model.post import Post
from sugar_network.model.report import Report
-from sugar_network.node.model import User
-from sugar_network.node import master_api
+from sugar_network.node import master_api, model
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, parcel, ranges, enforce
+from sugar_network.toolkit import http, packets, ranges, enforce
-RESOURCES = (User, Context, Post, Report)
+RESOURCES = (model.User, Context, Post, Report)
_logger = logging.getLogger('node.slave')
@@ -62,13 +61,13 @@ class SlaveRoutes(NodeRoutes):
def online_sync(self, no_pull=False):
conn = http.Connection(master_api.value)
response = conn.request('POST',
- data=parcel.encode(self._export(not no_pull), header={
+ data=packets.encode(self._export(not no_pull), header={
'from': self.guid,
'to': self._master_guid,
}),
params={'cmd': 'sync'},
headers={'Transfer-Encoding': 'chunked'})
- self._import(parcel.decode(response.raw))
+ self._import(packets.decode(response.raw))
@route('POST', cmd='offline_sync', acl=ACL.LOCAL)
def offline_sync(self, path):
@@ -82,7 +81,7 @@ class SlaveRoutes(NodeRoutes):
'event': 'sync_progress',
'progress': _('Reading sneakernet packages'),
})
- requests = self._import(parcel.decode_dir(path))
+ requests = self._import(packets.decode_dir(path))
this.broadcast({
'event': 'sync_progress',
@@ -91,7 +90,7 @@ class SlaveRoutes(NodeRoutes):
offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
if exists(offline_script):
shutil.copy(offline_script, path)
- parcel.encode_dir(requests + self._export(True), root=path, header={
+ packets.encode_dir(requests + self._export(True), root=path, header={
'from': self.guid,
'to': self._master_guid,
})
@@ -110,7 +109,7 @@ class SlaveRoutes(NodeRoutes):
sender = packet['from']
from_master = (sender == self._master_guid)
if packet.name == 'push':
- seqno, committed = this.volume.patch(packet)
+ seqno, committed = model.patch_volume(packet)
if seqno is not None:
if from_master:
with self._pull_r as r:
@@ -136,5 +135,5 @@ class SlaveRoutes(NodeRoutes):
export = []
if pull:
export.append(('pull', {'ranges': self._pull_r.value}, None))
- export.append(('push', None, self.volume.diff(self._push_r.value)))
+ export.append(('push', None, model.diff_volume(self._push_r.value)))
return export
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index 7585e29..bf80271 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -506,7 +506,7 @@ class Bin(object):
@property
def mtime(self):
if exists(self._path):
- return os.stat(self._path).st_mtime
+ return int(os.stat(self._path).st_mtime)
else:
return 0
@@ -650,7 +650,7 @@ class _NewFile(object):
dst_path = None
def __init__(self, **kwargs):
- self._file = tempfile.NamedTemporaryFile(delete=False, **kwargs)
+ self._file = NamedTemporaryFile(delete=False, **kwargs)
@property
def name(self):
@@ -666,6 +666,8 @@ class _NewFile(object):
def close(self):
self._file.close()
if exists(self.name):
+ if not exists(dirname(self.dst_path)):
+ os.makedirs(dirname(self.dst_path))
os.rename(self.name, self.dst_path)
def __enter__(self):
diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py
index 4a54975..e3a6173 100644
--- a/sugar_network/toolkit/coroutine.py
+++ b/sugar_network/toolkit/coroutine.py
@@ -303,25 +303,45 @@ class Spooler(object):
class _Local(object):
+ PROPERTY_NOT_SET = object()
+
def __init__(self):
self.attrs = set()
+ self.properties = {}
if hasattr(gevent.getcurrent(), 'local'):
current = gevent.getcurrent().local
for attr in current.attrs:
self.attrs.add(attr)
setattr(self, attr, getattr(current, attr))
+ self.properties = current.properties
class _LocalAccess(object):
def __getattr__(self, name):
- return getattr(gevent.getcurrent().local, name)
+ local = gevent.getcurrent().local
+ value = getattr(local, name)
+ if value is _Local.PROPERTY_NOT_SET:
+ value = local.properties[name]()
+ setattr(local, name, value)
+ return value
def __setattr__(self, name, value):
local = gevent.getcurrent().local
local.attrs.add(name)
- return setattr(local, name, value)
+ if value is None and name in local.properties:
+ value = _Local.PROPERTY_NOT_SET
+ setattr(local, name, value)
+
+ def add_property(self, name, getter):
+ local = gevent.getcurrent().local
+ local.properties[name] = getter
+ setattr(local, name, _Local.PROPERTY_NOT_SET)
+
+ def reset_property(self, name):
+ local = gevent.getcurrent().local
+ setattr(local, name, _Local.PROPERTY_NOT_SET)
class _Child(object):
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 0cbd535..4096b7c 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,7 +15,6 @@
import sys
import json
-import types
import logging
from os.path import join, dirname
@@ -110,12 +109,14 @@ class Connection(object):
_Session = None
- def __init__(self, url='', creds=None, max_retries=0, **session_args):
+ def __init__(self, url='', creds=None, max_retries=0, auth_request=None,
+ **session_args):
self.url = url
self.creds = creds
self._max_retries = max_retries
self._session_args = session_args
self._session = None
+ self._auth_request = auth_request
def __repr__(self):
return '<Connection url=%s>' % self.url
@@ -146,13 +147,17 @@ class Connection(object):
return self._decode_reply(reply)
def post(self, path_=None, data_=None, query_=None, **kwargs):
- reply = self.request('POST', path_, json.dumps(data_),
+ if data_ is not None:
+ data_ = json.dumps(data_)
+ reply = self.request('POST', path_, data_,
headers={'Content-Type': 'application/json'},
params=query_ or kwargs)
return self._decode_reply(reply)
def put(self, path_=None, data_=None, query_=None, **kwargs):
- reply = self.request('PUT', path_, json.dumps(data_),
+ if data_ is not None:
+ data_ = json.dumps(data_)
+ reply = self.request('PUT', path_, data_,
headers={'Content-Type': 'application/json'},
params=query_ or kwargs)
return self._decode_reply(reply)
@@ -182,8 +187,8 @@ class Connection(object):
f.close()
return reply
- def upload(self, path_=None, data_=None, **kwargs):
- reply = self.request('POST', path_, data_, params=kwargs)
+ def upload(self, path_=None, data=None, **kwargs):
+ reply = self.request('POST', path_, data, params=kwargs)
if reply.headers.get('Content-Type') == 'application/json':
return json.loads(reply.content)
else:
@@ -191,6 +196,11 @@ class Connection(object):
def request(self, method, path=None, data=None, headers=None, allowed=None,
params=None, **kwargs):
+ if data is not None and self._auth_request:
+ auth_request = self._auth_request
+ self._auth_request = None
+ self.request(**auth_request)
+
if self._session is None:
self._init()
@@ -209,6 +219,9 @@ class Connection(object):
reply = self._session.request(method, path, data=data,
headers=headers, params=params, **kwargs)
if reply.status_code == Unauthorized.status_code:
+ enforce(data is None,
+ 'Authorization is requited '
+ 'but no way to resend posting data')
enforce(self.creds is not None, Unauthorized, 'No credentials')
challenge_ = reply.headers.get('www-authenticate')
if challenge and challenge == challenge_:
@@ -218,6 +231,7 @@ class Connection(object):
self.post(['user'], profile)
challenge = challenge_
self._session.headers.update(self.creds.logon(challenge))
+ self._auth_request = None
try_ = 0
elif reply.status_code == 200 or \
allowed and reply.status_code in allowed:
@@ -228,12 +242,12 @@ class Connection(object):
error = json.loads(content)['error']
except Exception:
# On non-JSONified fail response, assume that the error
- # was not sent by the application level server code, i.e.,
+ # was not sent by the application-level server code, i.e.,
# something happaned on low level, like connection abort.
# If so, try to resend request.
- if try_ <= self._max_retries and method in ('GET', 'HEAD'):
+ if try_ <= self._max_retries and data is None:
continue
- error = content or reply.headers.get('x-sn-error') or \
+ error = content or reply.headers.get('x-error') or \
'No error message provided'
cls = _FORWARD_STATUSES.get(reply.status_code, RuntimeError) \
or ConnectionError
@@ -242,24 +256,11 @@ class Connection(object):
return reply
def call(self, request, response=None):
- if request.content_type == 'application/json':
- request.content = json.dumps(request.content)
-
- headers = {}
- if request.content is not None:
- headers['content-type'] = \
- request.content_type or 'application/octet-stream'
- headers['content-length'] = str(len(request.content))
- elif request.content_stream is not None:
- headers['content-type'] = \
- request.content_type or 'application/octet-stream'
- # TODO Avoid reading the full content at once
- if isinstance(request.content_stream, types.GeneratorType):
- request.content = ''.join([i for i in request.content_stream])
- else:
- request.content = request.content_stream.read()
- headers['content-length'] = str(len(request.content))
+ headers = {
+ 'content-type': request.content_type or 'application/octet-stream',
+ }
for env_key, key in (
+ ('CONTENT_LENGTH', 'content-length'),
('HTTP_IF_MODIFIED_SINCE', 'if-modified-since'),
('HTTP_ACCEPT_LANGUAGE', 'accept-language'),
('HTTP_ACCEPT_ENCODING', 'accept-encoding'),
@@ -269,12 +270,18 @@ class Connection(object):
headers[key] = value
headers.update(request.headers)
+ data = None
+ if request.method in ('POST', 'PUT'):
+ if request.content_type == 'application/json':
+ data = json.dumps(request.content)
+ else:
+ data = request.content
+
path = request.path
while True:
- reply = self.request(request.method, path,
- data=request.content, params=request.query or request,
- headers=headers, allowed=_REDIRECT_CODES,
- allow_redirects=False)
+ reply = self.request(request.method, path, data=data,
+ params=request.query or request, headers=headers,
+ allowed=_REDIRECT_CODES, allow_redirects=False)
resend = reply.status_code in _REDIRECT_CODES
if response is not None:
if 'transfer-encoding' in reply.headers:
@@ -293,7 +300,10 @@ class Connection(object):
if request.method != 'HEAD':
if reply.headers.get('Content-Type') == 'application/json':
- return json.loads(reply.content)
+ if reply.content:
+ return json.loads(reply.content)
+ else:
+ return None
else:
return reply.raw
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/packets.py
index edbbf02..46bc223 100644
--- a/sugar_network/toolkit/parcel.py
+++ b/sugar_network/toolkit/packets.py
@@ -34,13 +34,13 @@ from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
DEFAULT_COMPRESSLEVEL = 6
-_FILENAME_SUFFIX = '.parcel'
+_FILENAME_SUFFIX = '.packet'
_RESERVED_DISK_SPACE = 1024 * 1024
_ZLIB_WBITS = 15
_ZLIB_WBITS_SIZE = 32768 # 2 ** 15
-_logger = logging.getLogger('parcel')
+_logger = logging.getLogger('packets')
def decode(stream, limit=None):
@@ -49,26 +49,19 @@ def decode(stream, limit=None):
if limit is not None:
limit -= 2
magic = stream.read(2)
- enforce(len(magic) == 2, http.BadRequest, 'Malformed parcel')
+ enforce(len(magic) == 2, http.BadRequest, 'Malformed packet')
if magic == '\037\213':
stream = _ZippedDecoder(stream, limit)
else:
stream = _Decoder(magic, stream, limit)
header = stream.read_record()
- packet = _DecodeIterator(stream)
- while True:
- packet.next()
- if packet.name == 'last':
- break
- packet.header.update(header)
- yield packet
+ return _DecodeIterator(stream, header)
-def encode(packets, limit=None, header=None, compresslevel=None,
- on_complete=None):
- _logger.debug('Encode %r packets limit=%r header=%r',
- packets, limit, header)
+def encode(items, limit=None, header=None, compresslevel=None,
+ on_complete=None, **kwargs):
+ _logger.debug('Encode %r limit=%r header=%r', items, limit, header)
if compresslevel is 0:
ostream = _Encoder()
@@ -82,71 +75,84 @@ def encode(packets, limit=None, header=None, compresslevel=None,
if limit is None:
limit = sys.maxint
if header is None:
- header = {}
+ header = kwargs
+ else:
+ header.update(kwargs)
chunk = ostream.write_record(header)
if chunk:
yield chunk
- for packet, props, content in packets:
- if props is None:
- props = {}
- props['packet'] = packet
- chunk = ostream.write_record(props)
- if chunk:
- yield chunk
-
- if content is None:
- continue
+ try:
+ items = iter(items)
+ record = next(items)
+ multisegments = type(record) in (tuple, list)
- content = iter(content)
- try:
- finalizing = False
- record = next(content)
- while True:
- if record is None:
- finalizing = True
- record = next(content)
- continue
- blob_len = 0
- if isinstance(record, File):
- blob_len = record.size
- chunk = record.meta
- else:
- chunk = record
- chunk = ostream.write_record(chunk,
- None if finalizing else limit - blob_len)
- if chunk is None:
- _logger.debug('Reach the encoding limit')
- on_complete = None
- if not isinstance(content, GeneratorType):
- raise StopIteration()
- finalizing = True
- record = content.throw(StopIteration())
- continue
+ while True:
+ if multisegments:
+ packet, props, content = record
+ if props is None:
+ props = {}
+ props['segment'] = packet
+ chunk = ostream.write_record(props)
if chunk:
yield chunk
- if blob_len:
- for chunk in record.iter_content():
- blob_len -= len(chunk)
- if not blob_len:
- chunk += '\n'
- chunk = ostream.write(chunk)
- if chunk:
- yield chunk
- enforce(blob_len == 0, EOFError, 'Blob size mismatch')
- record = next(content)
- except StopIteration:
- pass
+ if content:
+ content = iter(content)
+ record = next(content)
+ else:
+ content = iter([])
+ record = None
+ else:
+ content = items
+
+ try:
+ finalizing = False
+ while True:
+ if record is None:
+ finalizing = True
+ record = next(content)
+ continue
+ blob_len = 0
+ if isinstance(record, File):
+ blob_len = record.size
+ chunk = record.meta
+ else:
+ chunk = record
+ chunk = ostream.write_record(chunk,
+ None if finalizing else limit - blob_len)
+ if chunk is None:
+ _logger.debug('Reach the encoding limit')
+ on_complete = None
+ if not isinstance(content, GeneratorType):
+ raise StopIteration()
+ finalizing = True
+ record = content.throw(StopIteration())
+ continue
+ if chunk:
+ yield chunk
+ if blob_len:
+ for chunk in record.iter_content():
+ blob_len -= len(chunk)
+ if not blob_len:
+ chunk += '\n'
+ chunk = ostream.write(chunk)
+ if chunk:
+ yield chunk
+ enforce(blob_len == 0, EOFError, 'Blob size mismatch')
+ record = next(content)
+ except StopIteration:
+ pass
+ if multisegments:
+ record = next(items)
+ continue
+ break
+ finally:
if on_complete is not None:
on_complete()
-
- chunk = ostream.write_record({'packet': 'last'})
- if chunk:
- yield chunk
- chunk = ostream.flush()
- if chunk:
- yield chunk
+ chunk = ostream.flush()
+ if chunk:
+ yield chunk
def decode_dir(root, recipient=None, session=None):
@@ -154,18 +160,19 @@ def decode_dir(root, recipient=None, session=None):
for filename in files:
if not filename.endswith(_FILENAME_SUFFIX):
continue
- with file(join(root, filename), 'rb') as parcel:
- for packet in decode(parcel):
- if recipient is not None and packet['from'] == recipient:
- if session and packet['session'] == session:
- _logger.debug('Skip the same session %r parcel',
- parcel.name)
- else:
- _logger.debug('Remove outdated %r parcel',
- parcel.name)
- os.unlink(parcel.name)
- break
- yield packet
+ with file(join(root, filename), 'rb') as packets:
+ packet = decode(packets)
+ if recipient is not None and packet['from'] == recipient:
+ if session and packet['session'] == session:
+ _logger.debug('Skip the same session %r packet',
+ packets.name)
+ else:
+ _logger.debug('Remove outdated %r packet',
+ packets.name)
+ os.unlink(packets.name)
+ continue
+ for i in packet:
+ yield i
def encode_dir(packets, root=None, limit=None, path=None, sender=None,
@@ -182,36 +189,22 @@ def encode_dir(packets, root=None, limit=None, path=None, sender=None,
if sender is not None:
header['from'] = sender
- _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header)
+ _logger.debug('Creating %r packet limit=%s header=%r', path, limit, header)
- with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel:
+ with toolkit.NamedTemporaryFile(dir=dirname(path)) as f:
for chunk in encode(packets, limit, header):
- parcel.write(chunk)
+ f.write(chunk)
coroutine.dispatch()
- parcel.flush()
- os.fsync(parcel.fileno())
- os.rename(parcel.name, path)
+ f.flush()
+ os.fsync(f.fileno())
+ os.rename(f.name, path)
class _DecodeIterator(object):
- def __init__(self, stream):
+ def __init__(self, stream, header):
self._stream = stream
- self.header = {}
- self._name = None
- self._shift = True
-
- @property
- def name(self):
- return self._name
-
- def next(self):
- if self._shift:
- for __ in self:
- pass
- if self._name is None:
- raise EOFError()
- self._shift = True
+ self.header = header
def __repr__(self):
return '<Packet %r>' % self.header
@@ -219,38 +212,70 @@ class _DecodeIterator(object):
def __getitem__(self, key):
return self.header.get(key)
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
def __iter__(self):
while True:
record = self._stream.read_record()
if record is None:
- self._name = None
- raise EOFError()
- if 'packet' in record:
- self._name = record['packet'] or ''
- self.header = record
- self._shift = False
break
- blob_len = record.get('content-length')
- if blob_len is None:
- yield record
- continue
- blob_len = int(blob_len)
- with toolkit.NamedTemporaryFile() as blob:
- digest = hashlib.sha1()
- while blob_len:
- chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
- enforce(chunk, 'Blob size mismatch')
- blob.write(chunk)
- blob_len -= len(chunk)
- digest.update(chunk)
- blob.flush()
- yield File(blob.name, digest=digest.hexdigest(), meta=record)
+ if 'segment' in record:
+ while record is not None:
+ record.update(self.header)
+ segment = _SegmentIterator(self._stream, record)
+ yield segment
+ record = segment.next_segment
+ if record is not None:
+ continue
+ while True:
+ record = self._stream.read_record()
+ if record is None or 'segment' in record:
+ break
+ break
+ for i in self._process_record(record):
+ yield i
- def __enter__(self):
- return self
+ def _process_record(self, record):
+ blob_len = record.get('content-length')
+ if blob_len is None:
+ yield record
+ return
- def __exit__(self, exc_type, exc_value, traceback):
- pass
+ blob_len = int(blob_len)
+ with toolkit.NamedTemporaryFile() as blob:
+ digest = hashlib.sha1()
+ while blob_len:
+ chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
+ enforce(chunk, 'Blob size mismatch')
+ blob.write(chunk)
+ blob_len -= len(chunk)
+ digest.update(chunk)
+ blob.flush()
+ yield File(blob.name, digest=digest.hexdigest(), meta=record)
+
+
+class _SegmentIterator(_DecodeIterator):
+
+ next_segment = None
+
+ @property
+ def name(self):
+ return self.header['segment']
+
+ def __iter__(self):
+ while True:
+ record = self._stream.read_record()
+ if record is None:
+ break
+ if 'segment' in record:
+ self.next_segment = record
+ break
+ for i in self._process_record(record):
+ yield i
class _Encoder(object):
@@ -317,16 +342,21 @@ class _Decoder(object):
self._buffer = prefix
self._stream = stream
self._limit = limit
+ self._eof = False
def read_record(self):
while True:
parts = self._buffer.split('\n', 1)
if len(parts) == 1:
- if self._read(BUFFER_SIZE):
+ if self._read(BUFFER_SIZE) and not self._eof:
continue
- return None
- result, self._buffer = parts
+ result = parts[0]
+ self._buffer = ''
+ else:
+ result, self._buffer = parts
if not result:
+ if self._eof:
+ return None
continue
return json.loads(result)
@@ -342,7 +372,9 @@ class _Decoder(object):
if self._limit is not None:
size = min(size, self._limit)
chunk = self._stream.read(size)
- if chunk and self._limit is not None:
+ if not chunk:
+ self._eof = True
+ elif self._limit is not None:
self._limit -= len(chunk)
return self._decode(chunk)
@@ -365,7 +397,7 @@ class _ZippedDecoder(_Decoder):
'Unknown compression method')
enforce(ord(stream.read(1)) == 0, http.BadRequest,
'Gzip flags should be empty')
- stream.read(6) # Ignore the rest of header
+ stream.read(6) # Ignore the rest of ZIP header
def _decode(self, chunk):
if chunk:
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index f4b23ce..bd5da32 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -88,10 +88,9 @@ class ACL(object):
AUTH = 1 << 10
AUTHOR = 1 << 11
- SUPERUSER = 1 << 12
+ AGG_AUTHOR = 1 << 12
- LOCAL = 1 << 13
- CALC = 1 << 14
+ LOCAL = 1 << 14
NAMES = {
CREATE: 'Create',
@@ -107,8 +106,7 @@ class ACL(object):
class Request(dict):
def __init__(self, environ=None, method=None, path=None, cmd=None,
- content=None, content_stream=None, content_type=None,
- principal=None, **kwargs):
+ content=None, content_type=None, principal=None, **kwargs):
dict.__init__(self)
self.path = []
@@ -120,7 +118,6 @@ class Request(dict):
self._dirty_query = False
self._if_modified_since = _NOT_SET
self._accept_language = _NOT_SET
- self._content_stream = content_stream or _NOT_SET
self._content_type = content_type or _NOT_SET
if environ:
@@ -194,7 +191,17 @@ class Request(dict):
@property
def content(self):
- self.ensure_content()
+ if self._content is not _NOT_SET:
+ return self._content
+ stream = self.environ.get('wsgi.input')
+ if stream is None:
+ self._content = None
+ else:
+ stream = _ContentStream(stream, self.content_length)
+ if self.content_type == 'application/json':
+ self._content = json.load(stream)
+ else:
+ self._content = stream
return self._content
@content.setter
@@ -212,39 +219,41 @@ class Request(dict):
self.environ['CONTENT_LENGTH'] = str(value)
@property
- def content_stream(self):
- if self._content_stream is _NOT_SET:
- s = self.environ.get('wsgi.input')
- if s is None:
- self._content_stream = None
- else:
- self._content_stream = _ContentStream(s, self.content_length)
- return self._content_stream
-
- @content_stream.setter
- def content_stream(self, value):
- self._content_stream = value
-
- @property
def resource(self):
if self.path:
return self.path[0]
+ @resource.setter
+ def resource(self, value):
+ self.path[0] = value
+
@property
def guid(self):
if len(self.path) > 1:
return self.path[1]
+ @guid.setter
+ def guid(self, value):
+ self.path[1] = value
+
@property
def prop(self):
if len(self.path) > 2:
return self.path[2]
+ @prop.setter
+ def prop(self, value):
+ self.path[2] = value
+
@property
def key(self):
if len(self.path) > 3:
return self.path[3]
+ @key.setter
+ def key(self, value):
+ self.path[3] = value
+
@property
def static_prefix(self):
http_host = self.environ.get('HTTP_HOST')
@@ -298,16 +307,6 @@ class Request(dict):
else:
existing_value = self[key] = [existing_value, value]
- def ensure_content(self):
- if self._content is not _NOT_SET:
- return
- if self.content_stream is None:
- self._content = None
- elif self.content_type == 'application/json':
- self._content = json.load(self.content_stream)
- else:
- self._content = self.content_stream.read()
-
def __repr__(self):
return '<Request method=%s path=%r cmd=%s query=%r>' % \
(self.method, self.path, self.cmd, dict(self))
@@ -539,7 +538,6 @@ class Router(object):
if route_.mime_type == 'text/event-stream' and \
self._allow_spawn and 'spawn' in request:
_logger.debug('Spawn event stream for %r', request)
- request.ensure_content()
coroutine.spawn(self._event_stream, request, result)
result = None
elif route_.mime_type and 'content-type' not in response:
@@ -617,29 +615,23 @@ class Router(object):
response.content_type = 'application/json'
streamed_content = isinstance(content, types.GeneratorType)
-
- if request.method == 'HEAD':
- streamed_content = False
- content = None
- elif js_callback:
+ if js_callback or response.content_type == 'application/json':
if streamed_content:
content = ''.join(content)
streamed_content = False
- content = '%s(%s);' % (js_callback, json.dumps(content))
- response.content_length = len(content)
- elif not streamed_content:
- if response.content_type == 'application/json':
+ else:
content = json.dumps(content)
- response.content_length = len(content)
- elif 'content-length' not in response:
- response.content_length = len(content) if content else 0
- if request.method == 'HEAD' and content is not None:
- _logger.warning('Content from HEAD response is ignored')
+ if js_callback:
+ content = '%s(%s);' % (js_callback, content)
+ if request.method == 'HEAD':
+ streamed_content = False
content = None
- _save_cookie(response, 'sugar_network_node', this.cookie)
+ elif not streamed_content:
+ response.content_length = len(content) if content else 0
_logger.trace('%s call: request=%s response=%r content=%r',
self, request.environ, response, repr(content)[:256])
+ _save_cookie(response, 'sugar_network_node', this.cookie)
start_response(response.status, response.items())
if streamed_content:
@@ -872,6 +864,13 @@ class _Route(object):
def __init__(self, callback, method, path, cmd, mime_type=None, acl=0,
arguments=None):
+ enforce(acl ^ ACL.AUTHOR or acl & ACL.AUTH,
+ 'ACL.AUTHOR without ACL.AUTH')
+ enforce(acl ^ ACL.AUTHOR or len(path) >= 2,
+ 'ACL.AUTHOR requires longer path')
+ enforce(acl ^ ACL.AGG_AUTHOR or len(path) >= 3,
+ 'ACL.AGG_AUTHOR requires longer path')
+
self.op = (method, cmd)
self.callback = callback
self.method = method
diff --git a/sugar_network/toolkit/spec.py b/sugar_network/toolkit/spec.py
index b3f83e9..279e748 100644
--- a/sugar_network/toolkit/spec.py
+++ b/sugar_network/toolkit/spec.py
@@ -174,7 +174,7 @@ def parse_requires(requires):
return result
-def ensure(version, cond):
+def ensure_version(version, cond):
if cond:
for op, cond_version in cond:
if op == [0]: