Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-03-24 11:55:25 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-03-24 11:55:25 (GMT)
commit6ec16441c7c133c55385613f1e430c5ea37af632 (patch)
tree51870c8fa43a3bcabc6918206b3fc5265a91300a /sugar_network
parent40021927aa1815dd54e2e7839a46e5bd1ae8c7b3 (diff)
Fix basic client routes
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/client/__init__.py7
-rw-r--r--sugar_network/client/injector.py113
-rw-r--r--sugar_network/client/journal.py13
-rw-r--r--sugar_network/client/model.py36
-rw-r--r--sugar_network/client/routes.py293
-rw-r--r--sugar_network/db/directory.py36
-rw-r--r--sugar_network/db/metadata.py3
-rw-r--r--sugar_network/db/resource.py62
-rw-r--r--sugar_network/db/routes.py130
-rw-r--r--sugar_network/db/volume.py24
-rw-r--r--sugar_network/model/__init__.py30
-rw-r--r--sugar_network/model/context.py46
-rw-r--r--sugar_network/model/post.py3
-rw-r--r--sugar_network/model/report.py3
-rw-r--r--sugar_network/model/routes.py1
-rw-r--r--sugar_network/node/master.py10
-rw-r--r--sugar_network/node/model.py22
-rw-r--r--sugar_network/node/routes.py16
-rw-r--r--sugar_network/node/slave.py23
-rw-r--r--sugar_network/toolkit/coroutine.py5
-rw-r--r--sugar_network/toolkit/http.py3
-rw-r--r--sugar_network/toolkit/router.py36
22 files changed, 481 insertions, 434 deletions
diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py
index 446795a..648d418 100644
--- a/sugar_network/client/__init__.py
+++ b/sugar_network/client/__init__.py
@@ -81,11 +81,6 @@ hub_root = Option(
'from file:// url',
default='/usr/share/sugar-network/hub')
-layers = Option(
- 'comma separated list of layers to restrict Sugar Network content by',
- default=[], type_cast=Option.list_cast, type_repr=Option.list_repr,
- name='layers')
-
discover_node = Option(
'discover nodes in local network instead of using --api',
default=False, type_cast=Option.bool_cast,
@@ -179,7 +174,7 @@ def Connection(url=None, **args):
def IPCConnection():
return http.Connection(
- api='http://127.0.0.1:%s' % ipc_port.value,
+ 'http://127.0.0.1:%s' % ipc_port.value,
# Online ipc->client->node request might fail if node connection
# is lost in client process, so, re-send ipc request immediately
# to retrive data from client in offline mode without propagating
diff --git a/sugar_network/client/injector.py b/sugar_network/client/injector.py
index 12baf51..6d0c420 100644
--- a/sugar_network/client/injector.py
+++ b/sugar_network/client/injector.py
@@ -27,6 +27,7 @@ from sugar_network import toolkit
from sugar_network.client import packagekit, journal, profile_path
from sugar_network.toolkit.spec import format_version
from sugar_network.toolkit.bundle import Bundle
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import lsb_release, coroutine, i18n, pylru, http
from sugar_network.toolkit import enforce
@@ -47,6 +48,7 @@ class Injector(object):
limit_bytes, limit_percent)
self._api = None
self._checkins = toolkit.Bin(join(root, 'checkins'), {})
+ self._inprogress = {}
for dir_name in ('solutions', 'releases'):
dir_path = join(root, dir_name)
@@ -81,8 +83,8 @@ class Injector(object):
yield {'event': 'launch', 'state': 'init'}
releases = []
- acquired = []
checkedin = {}
+ inprogress = []
environ = {}
def acquire(ctx):
@@ -92,8 +94,8 @@ class Injector(object):
if ctx in self._checkins:
checkedin[ctx] = (self.api, stability, self.seqno)
else:
- _logger.debug('Acquire %r', ctx)
- acquired.extend(solution.values())
+ inprogress.append((ctx, solution))
+ self._progress_in(ctx)
releases.extend(solution.values())
release = solution[ctx]
return release, self._pool.path(release['blob'])
@@ -135,9 +137,9 @@ class Injector(object):
yield environ
status = child.wait()
finally:
- if acquired:
- _logger.debug('Release acquired contexts')
- self._pool.push(acquired)
+ for ctx, solution in inprogress:
+ self._progress_out(ctx, True)
+ self._pool.push(solution.values())
if checkedin:
with self._checkins as checkins:
@@ -148,19 +150,22 @@ class Injector(object):
yield {'event': 'launch', 'state': 'exit'}
def checkin(self, context, stability='stable'):
- if context in self._checkins:
- _logger.debug('Refresh %r checkin', context)
- else:
- _logger.debug('Checkin %r', context)
- yield {'event': 'checkin', 'state': 'solve'}
- solution = self._solve(context, stability)
- for event in self._download(solution.values()):
- event['event'] = 'checkin'
- yield event
- self._pool.pop(solution.values())
- with self._checkins as checkins:
- checkins[context] = (self.api, stability, self.seqno)
- yield {'event': 'checkin', 'state': 'ready'}
+ self._progress_in(context)
+ try:
+ yield {'event': 'checkin', 'state': 'solve'}
+ solution = self._solve(context, stability)
+ for event in self._download(solution.values()):
+ event['event'] = 'checkin'
+ yield event
+ self._pool.pop(solution.values())
+ with self._checkins as checkins:
+ checkins[context] = (self.api, stability, self.seqno)
+ yield {'event': 'checkin', 'state': 'ready'}
+ directory = this.volume['context']
+ pins = list(set(directory[context]['pins']) | set(['checkin']))
+ directory.update(context, {'pins': pins})
+ finally:
+ self._progress_out(context)
def checkout(self, context):
if context not in self._checkins:
@@ -171,8 +176,51 @@ class Injector(object):
self._pool.push(solution.values())
with self._checkins as checkins:
del checkins[context]
+ directory = this.volume['context']
+ pins = list(set(directory[context]['pins']) - set(['checkin']))
+ directory.update(context, {'pins': pins})
+ self._notify(context)
return True
+ def pins(self, context, stability='stable'):
+ result = []
+ if self.api and context in self._checkins:
+ api, s, seqno = self._checkins[context]
+ if api != self.api or s != stability or seqno != self.seqno:
+ result.append('stale')
+ if self._inprogress.get(context):
+ result.append('inprogress')
+ return result
+
+ def _notify(self, context, force=False):
+ if not force and not self.api:
+ return
+ doc = this.volume['context'][context]
+ pins = doc.repr('pins') if doc.exists else self.pins(context)
+ this.localcast({
+ 'event': 'update',
+ 'resource': 'context',
+ 'guid': context,
+ 'props': {'pins': pins},
+ })
+
+ def _progress_in(self, context):
+ progress = self._inprogress.setdefault(context, 0)
+ self._inprogress[context] = progress + 1
+ if not progress:
+ _logger.debug('%r is in-progress', context)
+ self._notify(context, True)
+
+ def _progress_out(self, context, force=False):
+ progress = self._inprogress.get(context)
+ if not progress:
+ _logger.warn('Progress counter broken for %r', context)
+ return
+ self._inprogress[context] = progress - 1
+ if progress == 1:
+ _logger.debug('%r is not in-progress', context)
+ self._notify(context, force)
+
def _solve(self, context, stability):
path = join(self._root, 'solutions', context)
solution = None
@@ -193,7 +241,8 @@ class Injector(object):
_logger.debug('Reuse cached %r solution in offline', context)
if not solution:
- enforce(self.api, 'Cannot solve in offline')
+ enforce(self.api, http.ServiceUnavailable,
+ 'Not available in offline')
_logger.debug('Solve %r', context)
solution = self._api.get(['context', context], cmd='solve',
stability=stability, lsb_id=lsb_release.distributor_id(),
@@ -422,21 +471,21 @@ def _exec(context, release, path, args, environ):
os.chdir(path)
- environ = os.environ
- environ['PATH'] = ':'.join([
+ env = os.environ
+ env['PATH'] = ':'.join([
join(path, 'activity'),
join(path, 'bin'),
- environ['PATH'],
+ env['PATH'],
])
- environ['PYTHONPATH'] = path + ':' + environ.get('PYTHONPATH', '')
- environ['SUGAR_BUNDLE_PATH'] = path
- environ['SUGAR_BUNDLE_ID'] = context
- environ['SUGAR_BUNDLE_NAME'] = i18n.decode(release['title'])
- environ['SUGAR_BUNDLE_VERSION'] = format_version(release['version'])
- environ['SUGAR_ACTIVITY_ROOT'] = datadir
- environ['SUGAR_LOCALEDIR'] = join(path, 'locale')
-
- os.execvpe(args[0], args, environ)
+ env['PYTHONPATH'] = path + ':' + env.get('PYTHONPATH', '')
+ env['SUGAR_BUNDLE_PATH'] = path
+ env['SUGAR_BUNDLE_ID'] = context
+ env['SUGAR_BUNDLE_NAME'] = i18n.decode(release['title'])
+ env['SUGAR_BUNDLE_VERSION'] = format_version(release['version'])
+ env['SUGAR_ACTIVITY_ROOT'] = datadir
+ env['SUGAR_LOCALEDIR'] = join(path, 'locale')
+
+ os.execvpe(args[0], args, env)
except BaseException:
logging.exception('Failed to execute %r args=%r', release, args)
finally:
diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py
index 0dcae12..6a8f5ed 100644
--- a/sugar_network/client/journal.py
+++ b/sugar_network/client/journal.py
@@ -19,8 +19,8 @@ import logging
from shutil import copyfileobj
from tempfile import NamedTemporaryFile
-from sugar_network import client, toolkit
-from sugar_network.toolkit.router import route, Request
+from sugar_network import client
+from sugar_network.toolkit.router import route, Request, File
from sugar_network.toolkit import enforce
@@ -105,14 +105,15 @@ class Routes(object):
@route('GET', ['journal', None, 'preview'])
def journal_get_preview(self, request, response):
- return toolkit.File(_prop_path(request.guid, 'preview'), {
- 'mime_type': 'image/png',
+ return File(_prop_path(request.guid, 'preview'), meta={
+ 'content-type': 'image/png',
})
@route('GET', ['journal', None, 'data'])
def journal_get_data(self, request, response):
- return toolkit.File(_ds_path(request.guid, 'data'), {
- 'mime_type': get(request.guid, 'mime_type') or 'application/octet',
+ return File(_ds_path(request.guid, 'data'), meta={
+ 'content-type': get(request.guid, 'mime_type') or
+ 'application/octet',
})
@route('GET', ['journal', None, None], mime_type='application/json')
diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py
new file mode 100644
index 0000000..6207af2
--- /dev/null
+++ b/sugar_network/client/model.py
@@ -0,0 +1,36 @@
+# Copyright (C) 2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+
+from sugar_network import db
+from sugar_network.model.user import User
+from sugar_network.model.post import Post
+from sugar_network.model.report import Report
+from sugar_network.model import context as base_context
+from sugar_network.toolkit.coroutine import this
+
+
+_logger = logging.getLogger('client.model')
+
+
+class Context(base_context.Context):
+
+ @db.indexed_property(db.List, prefix='RP', default=[])
+ def pins(self, value):
+ return value + this.injector.pins(self.guid)
+
+
+RESOURCES = (User, Context, Post, Report)
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py
index 50d8632..c4b645d 100644
--- a/sugar_network/client/routes.py
+++ b/sugar_network/client/routes.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -17,66 +17,53 @@ import os
import logging
from base64 import b64encode
from httplib import IncompleteRead
-from zipfile import ZipFile, ZIP_DEFLATED
-from os.path import join, basename
+from os.path import join
from sugar_network import db, client, node, toolkit, model
-from sugar_network.client import journal, releases
-from sugar_network.node.slave import SlaveRoutes
-from sugar_network.toolkit import netlink, mountpoints
+from sugar_network.client import journal
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit.router import ACL, Request, Response, Router
from sugar_network.toolkit.router import route, fallbackroute
-from sugar_network.toolkit import zeroconf, coroutine, http, exception, enforce
+from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel
+from sugar_network.toolkit import lsb_release, exception, enforce
-# Top-level directory name to keep SN data on mounted devices
-_SN_DIRNAME = 'sugar-network'
# Flag file to recognize a directory as a synchronization directory
-_SYNC_DIRNAME = 'sugar-network-sync'
_RECONNECT_TIMEOUT = 3
_RECONNECT_TIMEOUT_MAX = 60 * 15
-_LOCAL_LAYERS = frozenset(['local', 'clone', 'favorite'])
_logger = logging.getLogger('client.routes')
-class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
+class ClientRoutes(model.FrontRoutes, journal.Routes):
- def __init__(self, home_volume, api_url=None, no_subscription=False):
+ def __init__(self, home_volume, no_subscription=False):
model.FrontRoutes.__init__(self)
- releases.Routes.__init__(self, home_volume)
journal.Routes.__init__(self)
+ this.localcast = this.broadcast
+
self._local = _LocalRoutes(home_volume)
self._inline = coroutine.Event()
self._inline_job = coroutine.Pool()
self._remote_urls = []
self._node = None
- self._jobs = coroutine.Pool()
+ self._connect_jobs = coroutine.Pool()
self._no_subscription = no_subscription
- self._server_mode = not api_url
- self._api_url = api_url
self._auth = _Auth()
- if not client.delayed_start.value:
- self.connect()
-
- def connect(self):
- self._got_offline(force=True)
- if self._server_mode:
- enforce(not client.login.value)
- mountpoints.connect(_SN_DIRNAME,
- self._found_mount, self._lost_mount)
+ def connect(self, api=None):
+ if self._connect_jobs:
+ return
+ self._got_offline()
+ if not api:
+ self._connect_jobs.spawn(self._discover_node)
else:
- if client.discover_server.value:
- self._jobs.spawn(self._discover_node)
- else:
- self._remote_urls.append(self._api_url)
- self._jobs.spawn(self._wait_for_connectivity)
+ self._remote_urls.append(api)
+ self._connect_jobs.spawn(self._wait_for_connectivity)
def close(self):
- self._jobs.kill()
+ self._connect_jobs.kill()
self._got_offline()
self._local.volume.close()
@@ -132,63 +119,89 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
return result
@route('GET', [None],
- arguments={
- 'offset': int,
- 'limit': int,
- 'reply': ('guid',),
- 'layer': list,
- },
+ arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
mime_type='application/json')
- def find(self, request, response, layer):
- if set(request.get('layer', [])) & set(['favorite', 'clone']):
+ def find(self, request, response):
+ if not self._inline.is_set() or 'pins' in request:
return self._local.call(request, response)
reply = request.setdefault('reply', ['guid'])
- if 'layer' not in reply:
+ if 'pins' not in reply:
return self.fallback(request, response)
if 'guid' not in reply:
- # Otherwise there is no way to mixin local `layer`
+ # Otherwise there is no way to mixin `pins`
reply.append('guid')
result = self.fallback(request, response)
directory = self._local.volume[request.resource]
for item in result['result']:
- if directory.exists(item['guid']):
- existing_layer = directory.get(item['guid'])['layer']
- item['layer'][:] = set(item['layer']) | set(existing_layer)
+ doc = directory[item['guid']]
+ if doc.exists:
+ item['pins'] += doc.repr('pins')
return result
@route('GET', [None, None], mime_type='application/json')
def get(self, request, response):
- if self._local.volume[request.resource].exists(request.guid):
+ if self._local.volume[request.resource][request.guid].exists:
return self._local.call(request, response)
else:
return self.fallback(request, response)
@route('GET', [None, None, None], mime_type='application/json')
def get_prop(self, request, response):
- if self._local.volume[request.resource].exists(request.guid):
+ if self._local.volume[request.resource][request.guid].exists:
return self._local.call(request, response)
else:
return self.fallback(request, response)
@route('POST', ['report'], cmd='submit', mime_type='text/event-stream')
- def submit_report(self, request, response):
- logs = request.content.pop('logs')
+ def submit_report(self):
+ props = this.request.content
+ logs = props.pop('logs')
+ props['uname'] = os.uname()
+ props['lsb_release'] = {
+ 'distributor_id': lsb_release.distributor_id(),
+ 'release': lsb_release.release(),
+ }
guid = self.fallback(method='POST', path=['report'],
- content=request.content, content_type='application/json')
- if logs:
- with toolkit.TemporaryFile() as tmpfile:
- with ZipFile(tmpfile, 'w', ZIP_DEFLATED) as zipfile:
- for path in logs:
- zipfile.write(path, basename(path))
- tmpfile.seek(0)
- self.fallback(method='PUT', path=['report', guid, 'data'],
- content_stream=tmpfile, content_type='application/zip')
+ content=props, content_type='application/json')
+ for logfile in logs:
+ with file(logfile) as f:
+ self.fallback(method='POST', path=['report', guid, 'logs'],
+ content_stream=f, content_type='text/plain')
yield {'event': 'done', 'guid': guid}
+ @route('GET', ['context', None], cmd='launch', arguments={'args': list},
+ mime_type='text/event-stream')
+ def launch(self):
+ return this.injector.launch(this.request.guid, **this.request)
+
+ @route('PUT', ['context', None], cmd='checkin',
+ mime_type='text/event-stream')
+ def put_checkin(self):
+ self._checkin_context()
+ for event in this.injector.checkin(this.request.guid):
+ yield event
+
+ @route('DELETE', ['context', None], cmd='checkin')
+ def delete_checkin(self, request):
+ this.injector.checkout(this.request.guid)
+ self._checkout_context()
+
+ @route('PUT', ['context', None], cmd='favorite')
+ def put_favorite(self, request):
+ self._checkin_context('favorite')
+
+ @route('DELETE', ['context', None], cmd='favorite')
+ def delete_favorite(self, request):
+ self._checkout_context('favorite')
+
+ @route('GET', cmd='recycle')
+ def recycle(self):
+ return this.injector.recycle()
+
@fallbackroute()
def fallback(self, request=None, response=None, **kwargs):
if request is None:
@@ -199,8 +212,6 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
if not self._inline.is_set():
return self._local.call(request, response)
- if client.layers.value and request.resource in ('context', 'release'):
- request.add('layer', *client.layers.value)
request.principal = self._auth.login
try:
reply = self._node.call(request, response)
@@ -217,21 +228,23 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
self._restart_online()
return self._local.call(request, response)
- def _got_online(self):
+ def _got_online(self, url):
enforce(not self._inline.is_set())
_logger.debug('Got online on %r', self._node)
self._inline.set()
+ self._local.volume.mute = True
+ this.injector.api = url
this.localcast({'event': 'inline', 'state': 'online'})
- def _got_offline(self, force=False):
- if not force and not self._inline.is_set():
- return
+ def _got_offline(self):
if self._node is not None:
self._node.close()
if self._inline.is_set():
_logger.debug('Got offline on %r', self._node)
- this.localcast({'event': 'inline', 'state': 'offline'})
self._inline.clear()
+ self._local.volume.mute = False
+ this.injector.api = None
+ this.localcast({'event': 'inline', 'state': 'offline'})
def _restart_online(self):
_logger.debug('Lost %r connection, try to reconnect in %s seconds',
@@ -256,10 +269,8 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
def pull_events():
for event in self._node.subscribe():
- if event.get('resource') == 'release':
- mtime = event.get('mtime')
- if mtime:
- self.invalidate_solutions(mtime)
+ if event.get('event') == 'release':
+ this.injector.seqno = event['seqno']
this.broadcast(event)
def handshake(url):
@@ -267,16 +278,13 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
self._node = client.Connection(url, auth=self._auth)
status = self._node.get(cmd='status')
self._auth.allow_basic_auth = (status.get('level') == 'master')
- """
- TODO switch to seqno
- impl_info = status['resources'].get('release')
- if impl_info:
- self.invalidate_solutions(impl_info['mtime'])
- """
+ seqno = status.get('seqno')
+ if seqno and 'releases' in seqno:
+ this.injector.seqno = seqno['releases']
if self._inline.is_set():
_logger.info('Reconnected to %r node', url)
else:
- self._got_online()
+ self._got_online(url)
def connect():
timeout = _RECONNECT_TIMEOUT
@@ -307,40 +315,32 @@ class ClientRoutes(model.FrontRoutes, releases.Routes, journal.Routes):
self._inline_job.kill()
self._inline_job.spawn_later(timeout, connect)
- def _found_mount(self, root):
- if self._inline.is_set():
- _logger.debug('Found %r node mount but %r is already active',
- root, self._node.volume.root)
- return
-
- _logger.debug('Found %r node mount', root)
-
- db_path = join(root, _SN_DIRNAME, 'db')
- node.data_root.value = db_path
- node.stats_root.value = join(root, _SN_DIRNAME, 'stats')
- node.files_root.value = join(root, _SN_DIRNAME, 'files')
- volume = db.Volume(db_path, model.RESOURCES)
-
- if not volume['user'].exists(self._auth.login):
- profile = self._auth.profile()
- profile['guid'] = self._auth.login
- volume['user'].create(profile)
-
- self._node = _NodeRoutes(join(db_path, 'node'), volume)
- self._jobs.spawn(volume.populate)
-
- logging.info('Start %r node on %s port', volume.root, node.port.value)
- server = coroutine.WSGIServer(('0.0.0.0', node.port.value), self._node)
- self._inline_job.spawn(server.serve_forever)
- self._got_online()
-
- def _lost_mount(self, root):
- if not self._inline.is_set() or \
- not self._node.volume.root.startswith(root):
+ def _checkin_context(self, pin=None):
+ context = this.volume['context'][this.request.guid]
+ if not context.exists:
+ enforce(self.inline(), http.ServiceUnavailable,
+ 'Not available in offline')
+ _logger.debug('Checkin %r context', context.guid)
+ clone = self.fallback(
+ method='GET', path=['context', context.guid], cmd='clone')
+ this.volume.patch(next(parcel.decode(clone)))
+ pins = context['pins']
+ if pin and pin not in pins:
+ this.volume['context'].update(context.guid, {'pins': pins + [pin]})
+
+ def _checkout_context(self, pin=None):
+ directory = this.volume['context']
+ context = directory[this.request.guid]
+ if not context.exists:
return
- _logger.debug('Lost %r node mount', root)
- self._inline_job.kill()
- self._got_offline()
+ pins = set(context.repr('pins'))
+ if pin:
+ pins -= set([pin])
+ if not self._inline.is_set() or pins:
+ if pin:
+ directory.update(context.guid, {'pins': list(pins)})
+ else:
+ directory.delete(context.guid)
class CachedClientRoutes(ClientRoutes):
@@ -351,16 +351,16 @@ class CachedClientRoutes(ClientRoutes):
self._push_job = coroutine.Pool()
ClientRoutes.__init__(self, home_volume, api_url, no_subscription)
- def _got_online(self):
- ClientRoutes._got_online(self)
+ def _got_online(self, url):
+ ClientRoutes._got_online(self, url)
self._push_job.spawn(self._push)
- def _got_offline(self, force=True):
+ def _got_offline(self):
self._push_job.kill()
- ClientRoutes._got_offline(self, force)
+ ClientRoutes._got_offline(self)
def _push(self):
- # TODO should work using regular pull/push
+ # TODO should work using regular diff
return
@@ -384,14 +384,12 @@ class CachedClientRoutes(ClientRoutes):
_logger.debug('Check %r local cache to push', res)
- for guid, patch in volume[res].diff(self._push_seq, layer='local'):
+ for guid, patch in volume[res].diff(self._push_seq):
diff = {}
diff_seq = toolkit.Sequence()
post_requests = []
for prop, meta, seqno in patch:
value = meta['value']
- if prop == 'layer':
- value = list(set(value) - _LOCAL_LAYERS)
diff[prop] = value
diff_seq.include(seqno, seqno)
if not diff:
@@ -436,67 +434,6 @@ class _LocalRoutes(db.Routes, Router):
db.Routes.__init__(self, volume)
Router.__init__(self, self)
- def on_create(self, request, props):
- props['layer'] = tuple(props['layer']) + ('local',)
- db.Routes.on_create(self, request, props)
-
-
-class _NodeRoutes(SlaveRoutes, Router):
-
- def __init__(self, key_path, volume):
- SlaveRoutes.__init__(self, key_path, volume)
- Router.__init__(self, self)
-
- self.api_url = 'http://127.0.0.1:%s' % node.port.value
- self._mounts = toolkit.Pool()
- self._jobs = coroutine.Pool()
-
- mountpoints.connect(_SYNC_DIRNAME,
- self.__found_mountcb, self.__lost_mount_cb)
-
- def close(self):
- self.volume.close()
-
- def __repr__(self):
- return '<LocalNode path=%s api_url=%s>' % \
- (self.volume.root, self.api_url)
-
- def _sync_mounts(self):
- this.localcast({'event': 'sync_start'})
-
- for mountpoint in self._mounts:
- this.localcast({'event': 'sync_next', 'path': mountpoint})
- try:
- self._offline_session = self._offline_sync(
- join(mountpoint, _SYNC_DIRNAME),
- **(self._offline_session or {}))
- except Exception, error:
- _logger.exception('Failed to complete synchronization')
- this.localcast({'event': 'sync_abort', 'error': str(error)})
- self._offline_session = None
- raise
-
- if self._offline_session is None:
- _logger.debug('Synchronization completed')
- this.localcast({'event': 'sync_complete'})
- else:
- _logger.debug('Postpone synchronization with %r session',
- self._offline_session)
- this.localcast({'event': 'sync_paused'})
-
- def __found_mountcb(self, path):
- self._mounts.add(path)
- if self._jobs:
- _logger.debug('Found %r sync mount, pool it', path)
- else:
- _logger.debug('Found %r sync mount, start synchronization', path)
- self._jobs.spawn(self._sync_mounts)
-
- def __lost_mount_cb(self, path):
- if self._mounts.remove(path) == toolkit.Pool.ACTIVE:
- _logger.warning('%r was unmounted, break synchronization', path)
- self._jobs.kill()
-
class _ResponseStream(object):
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py
index 9ebf907..7fe127d 100644
--- a/sugar_network/db/directory.py
+++ b/sugar_network/db/directory.py
@@ -20,8 +20,7 @@ from os.path import exists, join
from sugar_network import toolkit
from sugar_network.db.storage import Storage
from sugar_network.db.metadata import Metadata, Guid
-from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, exception, enforce
+from sugar_network.toolkit import exception, enforce
# To invalidate existed index on stcuture changes
@@ -32,7 +31,7 @@ _logger = logging.getLogger('db.directory')
class Directory(object):
- def __init__(self, root, resource, index_class, seqno):
+ def __init__(self, root, resource, index_class, seqno, broadcast):
"""
:param index_class:
what class to use to access to indexes, for regular casses
@@ -52,6 +51,7 @@ class Directory(object):
self._seqno = seqno
self._storage = None
self._index = None
+ self._broadcast = broadcast
self._open()
@@ -92,10 +92,10 @@ class Directory(object):
guid = props['guid'] = toolkit.uuid()
_logger.debug('Create %s[%s]: %r', self.metadata.name, guid, props)
event = {'event': 'create', 'guid': guid}
- self._index.store(guid, props, self._prestore, self._broadcast, event)
+ self._index.store(guid, props, self._prestore, self.broadcast, event)
return guid
- def update(self, guid, props):
+ def update(self, guid, props, event='update'):
"""Update properties for an existing document.
:param guid:
@@ -105,8 +105,10 @@ class Directory(object):
"""
_logger.debug('Update %s[%s]: %r', self.metadata.name, guid, props)
- event = {'event': 'update', 'guid': guid}
- self._index.store(guid, props, self._prestore, self._broadcast, event)
+ event = {'event': event, 'guid': guid}
+ if event['event'] == 'update':
+ event['props'] = props.copy()
+ self._index.store(guid, props, self._prestore, self.broadcast, event)
def delete(self, guid):
"""Delete document.
@@ -119,15 +121,9 @@ class Directory(object):
event = {'event': 'delete', 'guid': guid}
self._index.delete(guid, self._postdelete, guid, event)
- def exists(self, guid):
- return self._storage.get(guid).consistent
-
def get(self, guid):
cached_props = self._index.get_cached(guid)
record = self._storage.get(guid)
- enforce(cached_props or record.exists, http.NotFound,
- 'Resource %r does not exist in %r',
- guid, self.metadata.name)
return self.resource(guid, record, cached_props)
def __getitem__(self, guid):
@@ -202,10 +198,14 @@ class Directory(object):
if doc.post_seqno is not None and doc.exists:
# No need in after-merge event, further commit event
# is enough to avoid increasing events flow
- self._index.store(guid, doc.origs, self._preindex)
+ self._index.store(guid, doc.posts, self._preindex)
return seqno
+ def broadcast(self, event):
+ event['resource'] = self.metadata.name
+ self._broadcast(event)
+
def _open(self):
index_path = join(self._root, 'index', self.metadata.name)
if self._is_layout_stale():
@@ -219,10 +219,6 @@ class Directory(object):
self._storage = Storage(join(self._root, 'db', self.metadata.name))
_logger.debug('Open %r resource', self.resource)
- def _broadcast(self, event):
- event['resource'] = self.metadata.name
- this.broadcast(event)
-
def _preindex(self, guid, changes):
doc = self.resource(guid, self._storage.get(guid), changes)
for prop in self.metadata:
@@ -240,11 +236,11 @@ class Directory(object):
def _postdelete(self, guid, event):
self._storage.delete(guid)
- self._broadcast(event)
+ self.broadcast(event)
def _postcommit(self):
self._seqno.commit()
- self._broadcast({'event': 'commit', 'mtime': self._index.mtime})
+ self.broadcast({'event': 'commit', 'mtime': self._index.mtime})
def _save_layout(self):
path = join(self._root, 'index', self.metadata.name, 'layout')
diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py
index 31cace1..67a6d13 100644
--- a/sugar_network/db/metadata.py
+++ b/sugar_network/db/metadata.py
@@ -374,6 +374,9 @@ class Aggregated(Composite):
def subtypecast(self, value):
return self._subtype.typecast(value)
+ def subreprcast(self, value):
+ return self._subtype.reprcast(value)
+
def subteardown(self, value):
self._subtype.teardown(value)
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index d17637d..38c1ce4 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -13,14 +13,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import time
+
from sugar_network.db.metadata import indexed_property, Localized
-from sugar_network.db.metadata import Numeric, List, Authors
+from sugar_network.db.metadata import Numeric, List, Authors, Enum
from sugar_network.db.metadata import Composite, Aggregated
-from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit.router import ACL
from sugar_network.toolkit import ranges
+STATES = ['active', 'deleted']
+STATUSES = ['featured']
+
+
class Resource(object):
"""Base class for all data classes."""
@@ -32,10 +37,13 @@ class Resource(object):
def __init__(self, guid, record, origs=None, posts=None):
self.origs = origs or {}
self.posts = posts or {}
- self.guid = guid
- self.is_new = not bool(guid)
self.record = record
self._post_seqno = None
+ self._guid = guid
+
+ @property
+ def guid(self):
+ return self._guid or self['guid']
@property
def post_seqno(self):
@@ -64,33 +72,34 @@ class Resource(object):
def author(self, value):
return value
- @indexed_property(List, prefix='RL', default=[])
- def layer(self, value):
- return value
-
- @layer.setter
- def layer(self, value):
- orig = self.orig('layer')
- if 'deleted' in value:
- if this.request.method != 'POST' and 'deleted' not in orig:
- self.deleted()
- elif this.request.method != 'POST' and 'deleted' in orig:
- self.restored()
+ @indexed_property(Enum, STATES, prefix='RE', default=STATES[0], acl=0)
+ def state(self, value):
return value
@indexed_property(List, prefix='RT', full_text=True, default=[])
def tags(self, value):
return value
+ @indexed_property(List, prefix='RU', default=[], acl=ACL.READ,
+ subtype=Enum(STATUSES))
+ def status(self, value):
+ return value
+
+ @indexed_property(List, prefix='RP', default=[])
+ def pins(self, value):
+ return value
+
@property
def exists(self):
return self.record is not None and self.record.consistent
- def deleted(self):
- pass
+ def created(self):
+ ts = int(time.time())
+ self.posts['ctime'] = ts
+ self.posts['mtime'] = ts
- def restored(self):
- pass
+ def updated(self):
+ self.posts['mtime'] = int(time.time())
def get(self, prop, default=None):
"""Get document's property value.
@@ -128,6 +137,19 @@ class Resource(object):
self.origs[prop.name] = value
return value
+ def repr(self, prop):
+ """Get property value with applying output typecasts.
+
+ Such property values should be used to return property
+ out from the system.
+
+ """
+ prop_ = self.metadata[prop]
+ value = prop_.reprcast(self.get(prop))
+ if prop_.on_get is not None:
+ value = prop_.on_get(self, value)
+ return value
+
def properties(self, props):
result = {}
for i in props:
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index e1f190c..f319658 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -14,7 +14,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
-import time
import logging
from contextlib import contextmanager
@@ -22,7 +21,7 @@ from sugar_network import toolkit
from sugar_network.db.metadata import Aggregated
from sugar_network.toolkit.router import ACL, File, route, fallbackroute
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit import http, parcel, enforce
_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
@@ -40,20 +39,17 @@ class Routes(object):
@route('POST', [None], acl=ACL.AUTH, mime_type='application/json')
def create(self, request):
with self._post(request, ACL.CREATE) as doc:
- self.on_create(request, doc.posts)
+ doc.created()
+ if request.principal:
+ authors = doc.posts['author'] = {}
+ self._useradd(authors, request.principal, ACL.ORIGINAL)
self.volume[request.resource].create(doc.posts)
- self.after_post(doc)
return doc['guid']
@route('GET', [None],
- arguments={
- 'offset': int,
- 'limit': int,
- 'layer': [],
- 'reply': ('guid',),
- },
+ arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
mime_type='application/json')
- def find(self, request, reply, limit, layer):
+ def find(self, request, reply, limit):
self._preget(request)
if self._find_limit:
if limit <= 0:
@@ -62,27 +58,22 @@ class Routes(object):
_logger.warning('The find limit is restricted to %s',
self._find_limit)
request['limit'] = self._find_limit
- if 'deleted' in layer:
- _logger.warning('Requesting "deleted" layer, will ignore')
- layer.remove('deleted')
documents, total = self.volume[request.resource].find(
- not_layer='deleted', **request)
+ not_state='deleted', **request)
result = [self._postget(request, i, reply) for i in documents]
return {'total': total, 'result': result}
@route('GET', [None, None], cmd='exists', mime_type='application/json')
def exists(self, request):
- directory = self.volume[request.resource]
- return directory.exists(request.guid)
+ return self.volume[request.resource][request.guid].exists
@route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update(self, request):
with self._post(request, ACL.WRITE) as doc:
if not doc.posts:
return
- self.on_update(request, doc.posts)
+ doc.updated()
self.volume[request.resource].update(doc.guid, doc.posts)
- self.after_post(doc)
@route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update_prop(self, request):
@@ -97,8 +88,12 @@ class Routes(object):
def delete(self, request):
# Node data should not be deleted immediately
# to make master-slave synchronization possible
- request.content = {'layer': 'deleted'}
- self.update(request)
+ directory = self.volume[request.resource]
+ doc = directory[request.guid]
+ enforce(doc.exists, http.NotFound, 'Resource not found')
+ doc.posts['state'] = 'deleted'
+ doc.updated()
+ directory.update(doc.guid, doc.posts, 'delete')
@route('GET', [None, None], arguments={'reply': list},
mime_type='application/json')
@@ -111,26 +106,16 @@ class Routes(object):
reply.append(prop.name)
self._preget(request)
doc = self.volume[request.resource].get(request.guid)
- enforce('deleted' not in doc['layer'], http.NotFound, 'Deleted')
+ enforce(doc.exists and doc['state'] != 'deleted', http.NotFound,
+ 'Resource not found')
return self._postget(request, doc, reply)
@route('GET', [None, None, None], mime_type='application/json')
def get_prop(self, request, response):
directory = self.volume[request.resource]
- doc = directory.get(request.guid)
-
- prop = directory.metadata[request.prop]
- prop.assert_access(ACL.READ)
-
- meta = doc.meta(prop.name)
- if meta:
- value = meta['value']
- response.last_modified = meta['mtime']
- else:
- value = prop.default
- value = _get_prop(doc, prop, value)
+ directory.metadata[request.prop].assert_access(ACL.READ)
+ value = directory[request.guid].repr(request.prop)
enforce(value is not File.AWAY, http.NotFound, 'No blob')
-
return value
@route('HEAD', [None, None, None])
@@ -152,6 +137,20 @@ class Routes(object):
def remove_from_aggprop(self, request):
self._aggpost(request, ACL.REMOVE, request.key)
+ @route('GET', [None, None, None, None], mime_type='application/json')
+ def get_aggprop(self):
+ doc = self.volume[this.request.resource][this.request.guid]
+ prop = doc.metadata[this.request.prop]
+ prop.assert_access(ACL.READ)
+ enforce(isinstance(prop, Aggregated), http.BadRequest,
+ 'Property is not aggregated')
+ agg_value = doc[prop.name].get(this.request.key)
+ enforce(agg_value is not None, http.NotFound,
+ 'Aggregated item not found')
+ value = prop.subreprcast(agg_value['value'])
+ enforce(value is not File.AWAY, http.NotFound, 'No blob')
+ return value
+
@route('PUT', [None, None], cmd='useradd',
arguments={'role': 0}, acl=ACL.AUTH | ACL.AUTHOR)
def useradd(self, request, user, role):
@@ -171,60 +170,50 @@ class Routes(object):
del authors[user]
directory.update(request.guid, {'author': authors})
+ @route('GET', [None, None], cmd='clone')
+ def clone(self, request):
+ clone = self.volume.clone(request.resource, request.guid)
+ return parcel.encode([('push', None, clone)])
+
@fallbackroute('GET', ['blobs'])
def blobs(self):
return this.volume.blobs.get(this.request.guid)
- def on_create(self, request, props):
- ts = int(time.time())
- props['ctime'] = ts
- props['mtime'] = ts
-
- if request.principal:
- authors = props['author'] = {}
- self._useradd(authors, request.principal, ACL.ORIGINAL)
-
- def on_update(self, request, props):
- props['mtime'] = int(time.time())
-
def on_aggprop_update(self, request, prop, value):
pass
- def after_post(self, doc):
- pass
-
@contextmanager
def _post(self, request, access):
content = request.content
enforce(isinstance(content, dict), http.BadRequest, 'Invalid value')
- directory = self.volume[request.resource]
if access == ACL.CREATE:
- doc = directory.resource(None, None)
if 'guid' in content:
# TODO Temporal security hole, see TODO
guid = content['guid']
- enforce(not directory.exists(guid),
- http.BadRequest, '%s already exists', guid)
enforce(_GUID_RE.match(guid) is not None,
http.BadRequest, 'Malformed %s GUID', guid)
else:
- doc.posts['guid'] = toolkit.uuid()
- for name, prop in directory.metadata.items():
+ guid = toolkit.uuid()
+ doc = self.volume[request.resource][guid]
+ enforce(not doc.exists, 'Resource already exists')
+ doc.posts['guid'] = guid
+ for name, prop in doc.metadata.items():
if name not in content and prop.default is not None:
doc.posts[name] = prop.default
else:
- doc = directory.get(request.guid)
+ doc = self.volume[request.resource][request.guid]
+ enforce(doc.exists, 'Resource not found')
this.resource = doc
def teardown(new, old):
for name, value in new.items():
if old.get(name) != value:
- directory.metadata[name].teardown(value)
+ doc.metadata[name].teardown(value)
try:
for name, value in content.items():
- prop = directory.metadata[name]
+ prop = doc.metadata[name]
prop.assert_access(access, doc.orig(name))
if value is None:
doc.posts[name] = prop.default
@@ -255,8 +244,7 @@ class Routes(object):
def _postget(self, request, doc, props):
result = {}
for name in props:
- prop = doc.metadata[name]
- value = _get_prop(doc, prop, doc.get(name))
+ value = doc.repr(name)
if isinstance(value, File):
value = value.url
result[name] = value
@@ -264,10 +252,9 @@ class Routes(object):
def _useradd(self, authors, user, role):
props = {}
-
- users = self.volume['user']
- if users.exists(user):
- props['name'] = users.get(user)['name']
+ user_doc = self.volume['user'][user]
+ if user_doc.exists:
+ props['name'] = user_doc['name']
role |= ACL.INSYSTEM
else:
role &= ~ACL.INSYSTEM
@@ -316,15 +303,8 @@ class Routes(object):
authors = aggvalue['author'] = {}
role = ACL.ORIGINAL if request.principal in doc['author'] else 0
self._useradd(authors, request.principal, role)
- props = {request.prop: {aggid: aggvalue}}
- self.on_update(request, props)
- self.volume[request.resource].update(request.guid, props)
+ doc.posts[request.prop] = {aggid: aggvalue}
+ doc.updated()
+ self.volume[request.resource].update(request.guid, doc.posts)
return aggid
-
-
-def _get_prop(doc, prop, value):
- value = prop.reprcast(value)
- if prop.on_get is not None:
- value = prop.on_get(doc, value)
- return value
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 5d9bac1..7bf738c 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -19,9 +19,11 @@ from copy import deepcopy
from os.path import exists, join, abspath
from sugar_network import toolkit
+from sugar_network.db.metadata import Blob
from sugar_network.db.directory import Directory
from sugar_network.db.index import IndexWriter
from sugar_network.db.blobs import Blobs
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, coroutine, ranges, enforce
@@ -35,6 +37,7 @@ class Volume(dict):
def __init__(self, root, documents, index_class=None):
Volume._flush_pool.append(self)
self.resources = {}
+ self.mute = False
self._populators = coroutine.Pool()
if index_class is None:
@@ -122,6 +125,17 @@ class Volume(dict):
ranges.exclude(r, None, last_seqno)
yield {'commit': commit_r}
+ def clone(self, resource, guid):
+ doc = self[resource][guid]
+ patch = doc.diff([[1, None]])
+ if not patch:
+ return
+ for name, prop in self[resource].metadata.items():
+ if isinstance(prop, Blob) and name in patch:
+ yield self.blobs.get(patch[name]['value'])
+ yield {'resource': resource}
+ yield {'guid': guid, 'patch': patch}
+
def patch(self, records):
directory = None
committed = []
@@ -150,6 +164,13 @@ class Volume(dict):
return seqno, committed
+ def broadcast(self, event):
+ if not self.mute:
+ if event['event'] == 'commit':
+ this.broadcast(event)
+ else:
+ this.localcast(event)
+
def __enter__(self):
return self
@@ -167,7 +188,8 @@ class Volume(dict):
cls = getattr(mod, name.capitalize())
else:
cls = resource
- dir_ = Directory(self._root, cls, self._index_class, self.seqno)
+ dir_ = Directory(self._root, cls, self._index_class, self.seqno,
+ self.broadcast)
self._populators.spawn(self._populate, dir_)
self[name] = dir_
return dir_
diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py
index 9e1aaf5..4ff89ff 100644
--- a/sugar_network/model/__init__.py
+++ b/sugar_network/model/__init__.py
@@ -122,7 +122,6 @@ def generate_node_stats(volume):
def load_bundle(blob, context=None, initial=False, extra_deps=None):
- contexts = this.volume['context']
context_type = None
context_meta = None
release_notes = None
@@ -186,18 +185,21 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
enforce(context, http.BadRequest, 'Context is not specified')
enforce(version, http.BadRequest, 'Version is not specified')
release['version'] = parse_version(version)
- if initial and not contexts.exists(context):
- enforce(context_meta, http.BadRequest, 'No way to initate context')
- context_meta['guid'] = context
- context_meta['type'] = [context_type]
- this.call(method='POST', path=['context'], content=context_meta)
+
+ doc = this.volume['context'][context]
+ if initial:
+ if not doc.exists:
+ enforce(context_meta, http.BadRequest, 'No way to initate context')
+ context_meta['guid'] = context
+ context_meta['type'] = [context_type]
+ this.call(method='POST', path=['context'], content=context_meta)
else:
- enforce(context_type in contexts[context]['type'],
+ enforce(doc.exists, http.NotFound, 'No context')
+ enforce(context_type in doc['type'],
http.BadRequest, 'Inappropriate bundle type')
- context_doc = contexts[context]
if 'license' not in release:
- releases = context_doc['releases'].values()
+ releases = doc['releases'].values()
enforce(releases, http.BadRequest, 'License is not specified')
recent = max(releases, key=lambda x: x.get('value', {}).get('release'))
enforce(recent, http.BadRequest, 'License is not specified')
@@ -205,11 +207,11 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
_logger.debug('Load %r release: %r', context, release)
- if this.request.principal in context_doc['author']:
- patch = context_doc.format_patch(context_meta)
+ if this.request.principal in doc['author']:
+ patch = doc.format_patch(context_meta)
if patch:
this.call(method='PUT', path=['context', context], content=patch)
- context_doc.posts.update(patch)
+ doc.posts.update(patch)
# TRANS: Release notes title
title = i18n._('%(name)s %(version)s release')
else:
@@ -220,7 +222,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
'context': context,
'type': 'notification',
'title': i18n.encode(title,
- name=context_doc['title'],
+ name=doc['title'],
version=version,
),
'message': release_notes or '',
@@ -228,7 +230,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
content_type='application/json')
blob['content-disposition'] = 'attachment; filename="%s-%s%s"' % (
- ''.join(i18n.decode(context_doc['title']).split()),
+ ''.join(i18n.decode(doc['title']).split()),
version, mimetypes.guess_extension(blob.get('content-type')) or '',
)
this.volume.blobs.update(blob.digest, blob)
diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py
index 3aceacc..5e12360 100644
--- a/sugar_network/model/context.py
+++ b/sugar_network/model/context.py
@@ -21,7 +21,11 @@ from sugar_network.toolkit import svg_to_png
class Context(db.Resource):
- @db.indexed_property(db.List, prefix='T', full_text=True,
+ @db.indexed_property(db.List, prefix='P', default=[])
+ def pins(self, value):
+ return value
+
+ @db.indexed_property(db.List, prefix='T',
subtype=db.Enum(model.CONTEXT_TYPES))
def type(self, value):
return value
@@ -74,7 +78,7 @@ class Context(db.Resource):
def homepage(self, value):
return value
- @db.indexed_property(db.List, prefix='Y', default=[], full_text=True)
+ @db.indexed_property(db.List, prefix='Y', default=[])
def mime_types(self, value):
return value
@@ -90,7 +94,8 @@ class Context(db.Resource):
def logo(self, value):
return value
- @db.stored_property(db.Aggregated, subtype=db.Blob())
+ @db.stored_property(db.Aggregated, subtype=db.Blob(),
+ acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.AUTHOR)
def previews(self, value):
return value
@@ -99,12 +104,6 @@ class Context(db.Resource):
def releases(self, value):
return value
- @releases.setter
- def releases(self, value):
- if value or this.request.method != 'POST':
- self.invalidate_solutions()
- return value
-
@db.indexed_property(db.Numeric, slot=2, default=0,
acl=ACL.READ | ACL.CALC)
def downloads(self, value):
@@ -124,20 +123,19 @@ class Context(db.Resource):
"""
return value
- @dependencies.setter
- def dependencies(self, value):
- if value or this.request.method != 'POST':
- self.invalidate_solutions()
- return value
-
- def deleted(self):
- self.invalidate_solutions()
+ def created(self):
+ db.Resource.created(self)
+ self._invalidate_solutions()
- def restored(self):
- self.invalidate_solutions()
+ def updated(self):
+ db.Resource.updated(self)
+ self._invalidate_solutions()
- def invalidate_solutions(self):
- this.broadcast({
- 'event': 'release',
- 'seqno': this.volume.releases_seqno.next(),
- })
+ def _invalidate_solutions(self):
+ if self['releases'] and \
+ [i for i in ('state', 'releases', 'dependencies')
+ if i in self.posts and self.posts[i] != self.orig(i)]:
+ this.broadcast({
+ 'event': 'release',
+ 'seqno': this.volume.releases_seqno.next(),
+ })
diff --git a/sugar_network/model/post.py b/sugar_network/model/post.py
index 21046f2..d924617 100644
--- a/sugar_network/model/post.py
+++ b/sugar_network/model/post.py
@@ -74,7 +74,8 @@ class Post(db.Resource):
def preview(self, value):
return value
- @db.stored_property(db.Aggregated, subtype=db.Blob())
+ @db.stored_property(db.Aggregated, subtype=db.Blob(),
+ acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.AUTHOR)
def attachments(self, value):
if value:
value['name'] = self['title']
diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py
index be9fd9f..a434a6d 100644
--- a/sugar_network/model/report.py
+++ b/sugar_network/model/report.py
@@ -60,6 +60,7 @@ class Report(db.Resource):
def solution(self, value):
return value
- @db.stored_property(db.Aggregated, subtype=db.Blob())
+ @db.stored_property(db.Aggregated, subtype=db.Blob(),
+ acl=ACL.READ | ACL.INSERT | ACL.AUTHOR)
def logs(self, value):
return value
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index af19023..fb409d4 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -28,6 +28,7 @@ class FrontRoutes(object):
def __init__(self):
self._spooler = coroutine.Spooler()
this.broadcast = self._broadcast
+ this.localcast = self._broadcast
@route('GET', mime_type='text/html')
def hello(self):
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index 61d32fb..b93dcbc 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -17,6 +17,9 @@ import logging
from urlparse import urlsplit
from sugar_network import toolkit
+from sugar_network.model.post import Post
+from sugar_network.model.report import Report
+from sugar_network.node.model import User, Context
from sugar_network.node import obs, master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
@@ -24,12 +27,7 @@ from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, parcel, pylru, ranges, enforce
-RESOURCES = (
- 'sugar_network.node.model',
- 'sugar_network.model.post',
- 'sugar_network.model.report',
- 'sugar_network.model.user',
- )
+RESOURCES = (User, Context, Post, Report)
_logger = logging.getLogger('node.master')
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index 8de6038..8f9819b 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -14,10 +14,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import bisect
+import hashlib
import logging
from sugar_network import db
-from sugar_network.model import Release, context as base_context
+from sugar_network.model import Release, context as _context, user as _user
+
from sugar_network.node import obs
from sugar_network.toolkit.router import ACL
from sugar_network.toolkit.coroutine import this
@@ -28,6 +30,13 @@ _logger = logging.getLogger('node.model')
_presolve_queue = None
+class User(_user.User):
+
+ def created(self):
+ with file(this.volume.blobs.get(self['pubkey']).path) as f:
+ self.posts['guid'] = str(hashlib.sha1(f.read()).hexdigest())
+
+
class _Release(Release):
_package_cast = db.Dict(db.List())
@@ -87,23 +96,17 @@ class _Release(Release):
def teardown(self, value):
if 'package' not in this.resource['type']:
- return Release.typecast(self, value)
+ return Release.teardown(self, value)
# TODO Delete presolved files
-class Context(base_context.Context):
+class Context(_context.Context):
@db.stored_property(db.Aggregated, subtype=_Release(),
acl=ACL.READ | ACL.INSERT | ACL.REMOVE | ACL.REPLACE)
def releases(self, value):
return value
- @releases.setter
- def releases(self, value):
- if value or this.request.method != 'POST':
- self.invalidate_solutions()
- return value
-
def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
stability=None, requires=None):
@@ -151,6 +154,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
if context in context_clauses:
return context_clauses[context]
context = volume['context'][context]
+ enforce(context.exists, http.NotFound, 'Context not found')
releases = context['releases']
clause = []
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index 5fdb27e..ea23297 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -121,9 +121,9 @@ class NodeRoutes(db.Routes, FrontRoutes):
enforce(solution is not None, 'Failed to solve')
return solution
- @route('GET', ['context', None], cmd='clone',
- arguments={'requires': list})
- def get_clone(self, request, response):
+ @route('GET', ['context', None], cmd='resolve',
+ arguments={'requires': list, 'stability': list})
+ def resolve(self, request):
solution = self.solve(request)
return this.volume.blobs.get(solution[request.guid]['blob'])
@@ -149,12 +149,6 @@ class NodeRoutes(db.Routes, FrontRoutes):
enforce(self.authorize(request.principal, 'root'), http.Forbidden,
'Operation is permitted only for superusers')
- def on_create(self, request, props):
- if request.resource == 'user':
- with file(this.volume.blobs.get(props['pubkey']).path) as f:
- props['guid'] = str(hashlib.sha1(f.read()).hexdigest())
- db.Routes.on_create(self, request, props)
-
def on_aggprop_update(self, request, prop, value):
if prop.acl & ACL.AUTHOR:
self._enforce_authority(request)
@@ -164,8 +158,8 @@ class NodeRoutes(db.Routes, FrontRoutes):
def authenticate(self, auth):
enforce(auth.scheme == 'sugar', http.BadRequest,
'Unknown authentication scheme')
- if not self.volume['user'].exists(auth.login):
- raise Unauthorized('Principal does not exist', auth.nonce)
+ enforce(self.volume['user'][auth.login].exists, Unauthorized,
+ 'Principal does not exist')
from M2Crypto import RSA
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 333e6ea..76593e9 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -22,6 +22,10 @@ from os.path import join, dirname, exists, isabs
from gettext import gettext as _
from sugar_network import toolkit
+from sugar_network.model.context import Context
+from sugar_network.model.post import Post
+from sugar_network.model.report import Report
+from sugar_network.node.model import User
from sugar_network.node import master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
@@ -29,6 +33,8 @@ from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, parcel, ranges, enforce
+RESOURCES = (User, Context, Post, Report)
+
_logger = logging.getLogger('node.slave')
@@ -46,7 +52,6 @@ class SlaveRoutes(NodeRoutes):
@route('POST', cmd='online_sync', acl=ACL.LOCAL,
arguments={'no_pull': bool})
def online_sync(self, no_pull=False):
- self._export(not no_pull)
conn = http.Connection(master_api.value)
response = conn.request('POST',
data=parcel.encode(self._export(not no_pull), header={
@@ -100,22 +105,22 @@ class SlaveRoutes(NodeRoutes):
seqno, committed = this.volume.patch(packet)
if seqno is not None:
if from_master:
- ranges.exclude(self._pull_r.value, committed)
- self._pull_r.commit()
+ with self._pull_r as r:
+ ranges.exclude(r, committed)
else:
requests.append(('request', {
'origin': sender,
'ranges': committed,
}, []))
- ranges.exclude(self._push_r.value, seqno, seqno)
- self._push_r.commit()
+ with self._push_r as r:
+ ranges.exclude(r, seqno, seqno)
elif packet.name == 'ack' and from_master and \
packet['to'] == self.guid:
- ranges.exclude(self._pull_r.value, packet['ack'])
- self._pull_r.commit()
+ with self._pull_r as r:
+ ranges.exclude(r, packet['ack'])
if packet['ranges']:
- ranges.exclude(self._push_r.value, packet['ranges'])
- self._push_r.commit()
+ with self._push_r as r:
+ ranges.exclude(r, packet['ranges'])
return requests
diff --git a/sugar_network/toolkit/coroutine.py b/sugar_network/toolkit/coroutine.py
index b43c1e9..4a54975 100644
--- a/sugar_network/toolkit/coroutine.py
+++ b/sugar_network/toolkit/coroutine.py
@@ -370,8 +370,9 @@ def _print_exception(context, klass, value, tb):
context = 'Undefined'
elif not isinstance(context, basestring):
if isinstance(context, dict) and 'PATH_INFO' in context:
- context_repr = '%s%s' % \
- (context['PATH_INFO'], context.get('QUERY_STRING') or '')
+ context_repr = context['PATH_INFO']
+ if 'QUERY_STRING' in context:
+ context_repr += '?' + context['QUERY_STRING']
try:
context = self.format_context(context)
except Exception:
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 9b9754e..9dd437e 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -202,6 +202,9 @@ class Connection(object):
if not isinstance(path, basestring):
path = '/'.join([i.strip('/') for i in [self.url] + path])
+ # TODO Disable cookies on requests library level
+ self._session.cookies.clear()
+
try_ = 0
while True:
try_ += 1
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 8e23863..8eb84da 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -602,11 +602,11 @@ class Router(object):
request.ensure_content()
coroutine.spawn(self._event_stream, request, result)
result = None
+ elif route_.mime_type and 'content-type' not in response:
+ response.set('content-type', route_.mime_type)
except Exception, exception:
+ # To populate `exception` only
raise
- else:
- if route_.mime_type and 'content-type' not in response:
- response.set('content-type', route_.mime_type)
finally:
for i in self._postroutes:
i(request, response, result, exception)
@@ -689,7 +689,8 @@ class Router(object):
elif not streamed_content:
if response.content_type == 'application/json':
content = json.dumps(content)
- if 'content-length' not in response:
+ response.content_length = len(content)
+ elif 'content-length' not in response:
response.content_length = len(content) if content else 0
if request.method == 'HEAD' and content is not None:
_logger.warning('Content from HEAD response is ignored')
@@ -753,21 +754,12 @@ class Router(object):
commons['guid'] = request.guid
if request.prop:
commons['prop'] = request.prop
- try:
- for event in _event_stream(request, stream):
- if 'event' not in event:
- commons.update(event)
- else:
- event.update(commons)
- this.localcast(event)
- except Exception, error:
- _logger.exception('Event stream %r failed', request)
- event = {'event': 'failure',
- 'exception': type(error).__name__,
- 'error': str(error),
- }
- event.update(commons)
- this.localcast(event)
+ for event in _event_stream(request, stream):
+ if 'event' not in event:
+ commons.update(event)
+ else:
+ event.update(commons)
+ this.localcast(event)
def _assert_origin(self, environ):
origin = environ['HTTP_ORIGIN']
@@ -837,6 +829,12 @@ def _event_stream(request, stream):
event[0].update(i)
event = event[0]
yield event
+ except Exception, error:
+ _logger.exception('Event stream %r failed', request)
+ yield {'event': 'failure',
+ 'exception': type(error).__name__,
+ 'error': str(error),
+ }
finally:
_logger.debug('Event stream %r exited', request)