Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-04-20 11:24:21 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-04-20 11:24:21 (GMT)
commit046073b04229021ec53833a353ffd069d0a5b561 (patch)
tree1930c720a4391daeaf3e8540b2b027f9cd1ab97f
parent71391e654f497234fac0a4602bba769820aa521c (diff)
Pull node updates for checked-in resources
-rw-r--r--TODO3
-rwxr-xr-xsugar-network-client4
-rw-r--r--sugar_network/client/model.py12
-rw-r--r--sugar_network/client/routes.py239
-rw-r--r--sugar_network/db/blobs.py4
-rw-r--r--sugar_network/db/directory.py23
-rw-r--r--sugar_network/db/index.py19
-rw-r--r--sugar_network/db/resource.py17
-rw-r--r--sugar_network/db/routes.py125
-rw-r--r--sugar_network/db/volume.py43
-rw-r--r--sugar_network/model/__init__.py2
-rw-r--r--sugar_network/model/context.py4
-rw-r--r--sugar_network/model/routes.py15
-rw-r--r--sugar_network/node/auth.py4
-rw-r--r--sugar_network/node/model.py2
-rw-r--r--sugar_network/node/routes.py9
-rw-r--r--sugar_network/toolkit/__init__.py65
-rw-r--r--sugar_network/toolkit/http.py1
-rw-r--r--sugar_network/toolkit/parcel.py113
-rw-r--r--sugar_network/toolkit/router.py70
-rw-r--r--tests/__init__.py14
-rwxr-xr-xtests/units/client/injector.py96
-rwxr-xr-xtests/units/client/routes.py209
-rwxr-xr-xtests/units/db/index.py8
-rwxr-xr-xtests/units/db/resource.py39
-rwxr-xr-xtests/units/db/routes.py167
-rwxr-xr-xtests/units/db/volume.py216
-rwxr-xr-xtests/units/node/slave.py6
-rwxr-xr-xtests/units/toolkit/parcel.py148
-rwxr-xr-xtests/units/toolkit/router.py25
30 files changed, 1268 insertions, 434 deletions
diff --git a/TODO b/TODO
index aadb695..b841d22 100644
--- a/TODO
+++ b/TODO
@@ -1,6 +1,5 @@
- proxying as a tool to sort out downstream content
- push local offline changes to the node on getting online
-- diff/merge while checking in node context
- deliver spawn events only to local subscribers
- test/run presolve
- if node relocates api calls, do it only once in toolkit.http
@@ -9,6 +8,8 @@
- cache init sync pull
- switch auth from WWW-AUTHENTICATE to mutual authentication over the HTTPS
- restrict ACL.LOCAL routes only to localhost clients
+- prevent calling diff api cmd from clients to avoid disclosuring private props
+- pull node changes periodically for checked-in contexts
v2.0
====
diff --git a/sugar-network-client b/sugar-network-client
index 386a3b8..7e51aa2 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -28,7 +28,7 @@ coroutine.inject()
import sugar_network_webui as webui
from sugar_network import db, toolkit, client, node
-from sugar_network.client.routes import CachedClientRoutes
+from sugar_network.client.routes import ClientRoutes
from sugar_network.client.injector import Injector
from sugar_network.client.model import Volume
from sugar_network.client.auth import BasicCreds, SugarCreds
@@ -112,7 +112,7 @@ class Application(application.Daemon):
creds = SugarCreds(client.keyfile.value)
else:
raise RuntimeError('No credentials specified')
- routes = CachedClientRoutes(volume, creds)
+ routes = ClientRoutes(volume, creds)
router = Router(routes, allow_spawn=True)
logging.info('Listening for IPC requests on %s port',
diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py
index 70c8f46..fd85a4d 100644
--- a/sugar_network/client/model.py
+++ b/sugar_network/client/model.py
@@ -29,7 +29,7 @@ _logger = logging.getLogger('client.model')
class Context(_Context):
- @db.indexed_property(db.List, prefix='RP', default=[],
+ @db.indexed_property(db.List, prefix='P', default=[],
acl=ACL.READ | ACL.LOCAL)
def pins(self, value):
return value + this.injector.pins(self.guid)
@@ -37,7 +37,9 @@ class Context(_Context):
class Volume(db.Volume):
- def __init__(self, root):
- db.Volume.__init__(self, root, [User, Context, Post, Report])
- for resource in ('user', 'context', 'post'):
- self[resource].metadata['author'].acl |= ACL.LOCAL
+ def __init__(self, root, resources=None):
+ if resources is None:
+ resources = [User, Context, Post, Report]
+ db.Volume.__init__(self, root, resources)
+ for directory in self.values():
+ directory.metadata['author'].acl |= ACL.LOCAL
diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py
index f580789..f618df3 100644
--- a/sugar_network/client/routes.py
+++ b/sugar_network/client/routes.py
@@ -18,42 +18,46 @@ import logging
from httplib import IncompleteRead
from os.path import join
-from sugar_network import db, client, node, toolkit, model
-from sugar_network.client import journal
-from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit.router import Request, Router, File
+from sugar_network import db, client, node, toolkit
+from sugar_network.model import FrontRoutes
+from sugar_network.client.journal import Routes as JournalRoutes
+from sugar_network.toolkit.router import Request, Router, Response
from sugar_network.toolkit.router import route, fallbackroute
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel
from sugar_network.toolkit import ranges, lsb_release, enforce
-# Flag file to recognize a directory as a synchronization directory
+_SYNC_TIMEOUT = 30
_RECONNECT_TIMEOUT = 3
_RECONNECT_TIMEOUT_MAX = 60 * 15
_logger = logging.getLogger('client.routes')
-class ClientRoutes(model.FrontRoutes, journal.Routes):
+class ClientRoutes(FrontRoutes, JournalRoutes):
def __init__(self, home_volume, creds, no_subscription=False):
- model.FrontRoutes.__init__(self)
- journal.Routes.__init__(self)
+ FrontRoutes.__init__(self)
+ JournalRoutes.__init__(self)
this.localcast = this.broadcast
self._local = _LocalRoutes(home_volume)
+ self._remote = None
+ self._remote_urls = []
self._creds = creds
self._inline = coroutine.Event()
self._inline_job = coroutine.Pool()
- self._remote_urls = []
- self._node = None
self._connect_jobs = coroutine.Pool()
+ self._sync_jobs = coroutine.Pool()
self._no_subscription = no_subscription
+ self._pull_r = toolkit.Bin(
+ join(home_volume.root, 'var', 'pull'), [[1, None]])
self._push_r = toolkit.Bin(
- join(home_volume.root, 'var', 'push'),
- [[1, None]])
- self._push_job = coroutine.Pool()
+ join(home_volume.root, 'var', 'push'), [[1, None]])
+ self._push_guids_map = toolkit.Bin(
+ join(home_volume.root, 'var', 'push-guids'), {})
def connect(self, api=None):
if self._connect_jobs:
@@ -64,11 +68,13 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
else:
self._remote_urls.append(api)
self._connect_jobs.spawn(self._wait_for_connectivity)
+ self._local.volume.populate()
def close(self):
self._connect_jobs.kill()
self._got_offline()
self._local.volume.close()
+ self._pull_r.commit()
@fallbackroute('GET', ['hub'])
def hub(self):
@@ -99,7 +105,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
@fallbackroute('GET', ['packages'])
def route_packages(self):
- if self._inline.is_set():
+ if self.inline():
return self.fallback()
else:
# Let caller know that we are in offline and
@@ -113,7 +119,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
@route('GET', cmd='whoami', mime_type='application/json')
def whoami(self):
- if self._inline.is_set():
+ if self.inline():
result = self.fallback()
result['route'] = 'proxy'
else:
@@ -121,47 +127,6 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
result['guid'] = self._creds.login
return result
- @route('GET', [None],
- arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
- mime_type='application/json')
- def find(self):
- request = this.request
- if not self._inline.is_set() or 'pins' in request:
- return self._local.call(request, this.response)
-
- reply = request.setdefault('reply', ['guid'])
- if 'pins' not in reply:
- return self.fallback()
-
- if 'guid' not in reply:
- # Otherwise there is no way to mixin `pins`
- reply.append('guid')
- result = self.fallback()
-
- directory = self._local.volume[request.resource]
- for item in result['result']:
- doc = directory[item['guid']]
- if doc.exists:
- item['pins'] += doc.repr('pins')
-
- return result
-
- @route('GET', [None, None], mime_type='application/json')
- def get(self):
- request = this.request
- if self._local.volume[request.resource][request.guid].exists:
- return self._local.call(request, this.response)
- else:
- return self.fallback()
-
- @route('GET', [None, None, None], mime_type='application/json')
- def get_prop(self):
- request = this.request
- if self._local.volume[request.resource][request.guid].exists:
- return self._local.call(request, this.response)
- else:
- return self.fallback()
-
@route('POST', ['report'], cmd='submit', mime_type='text/event-stream')
def submit_report(self):
props = this.request.content
@@ -208,6 +173,62 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
def recycle(self):
return this.injector.recycle()
+ @route('GET', [None],
+ arguments={'offset': int, 'limit': int, 'reply': ['guid']},
+ mime_type='application/json')
+ def find(self, reply):
+ request = this.request
+ if not self.inline() or 'pins' in request:
+ return self._local.call(request, this.response)
+ if 'guid' not in reply:
+ # Otherwise no way to mixin `pins` or sync checkins
+ reply.append('guid')
+ if 'mtime' not in reply:
+ # To track updates for checked-in resources
+ reply.append('mtime')
+ result = self.fallback()
+ directory = self._local.volume[request.resource]
+ for item in result['result']:
+ checkin = directory[item['guid']]
+ if not checkin.exists:
+ continue
+ pins = item['pins'] = checkin.repr('pins')
+ if pins and item['mtime'] > checkin['mtime']:
+ pull = Request(method='GET',
+ path=[checkin.metadata.name, checkin.guid], cmd='diff')
+ self._sync_jobs.spawn(self._pull_checkin, pull, None, 'range')
+ return result
+
+ @route('GET', [None, None], mime_type='application/json')
+ def get(self):
+ request = this.request
+ if self._local.volume[request.resource][request.guid].exists:
+ return self._local.call(request, this.response)
+ else:
+ return self.fallback()
+
+ @route('GET', [None, None, None], mime_type='application/json')
+ def get_prop(self):
+ return self.get()
+
+ @route('PUT', [None, None])
+ def update(self):
+ if not self.inline():
+ return self.fallback()
+ request = this.request
+ local = self._local.volume[request.resource][request.guid]
+ if not local.exists or not local.repr('pins'):
+ return self.fallback()
+ self._pull_checkin(request, None, 'pull')
+
+ @route('PUT', [None, None, None])
+ def update_prop(self):
+ self.update()
+
+ @route('DELETE', [None, None])
+ def delete(self):
+ self.update()
+
@fallbackroute()
def fallback(self, request=None, response=None, **kwargs):
if request is None:
@@ -215,18 +236,18 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
if response is None:
response = this.response
- if not self._inline.is_set():
+ if not self.inline():
return self._local.call(request, response)
try:
- reply = self._node.call(request, response)
- if hasattr(reply, 'read'):
+ result = self._remote.call(request, response)
+ if hasattr(result, 'read'):
if response.relocations:
- return reply
+ return result
else:
- return _ResponseStream(reply, self._restart_online)
+ return _ResponseStream(result, self._restart_online)
else:
- return reply
+ return result
except (http.ConnectionError, IncompleteRead):
if response.relocations:
raise
@@ -234,28 +255,30 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
return self._local.call(request, response)
def _got_online(self, url):
- enforce(not self._inline.is_set())
- _logger.debug('Got online on %r', self._node)
+ enforce(not self.inline())
+ _logger.debug('Got online on %r', self._remote)
self._inline.set()
self._local.volume.mute = True
this.injector.api = url
this.localcast({'event': 'inline', 'state': 'online'})
- self._push_job.spawn(self._push)
+ if not self._local.volume.empty:
+ self._sync_jobs.spawn_later(_SYNC_TIMEOUT, self._sync)
def _got_offline(self):
- if self._node is not None:
- self._node.close()
- if self._inline.is_set():
- _logger.debug('Got offline on %r', self._node)
+ if self._remote is not None:
+ self._remote.close()
+ self._remote = None
+ if self.inline():
+ _logger.debug('Got offline on %r', self._remote)
self._inline.clear()
self._local.volume.mute = False
this.injector.api = None
this.localcast({'event': 'inline', 'state': 'offline'})
- self._push_job.kill()
+ self._sync_jobs.kill()
def _restart_online(self):
_logger.debug('Lost %r connection, try to reconnect in %s seconds',
- self._node, _RECONNECT_TIMEOUT)
+ self._remote, _RECONNECT_TIMEOUT)
self._remote_connect(_RECONNECT_TIMEOUT)
def _discover_node(self):
@@ -275,19 +298,19 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
def _remote_connect(self, timeout=0):
def pull_events():
- for event in self._node.subscribe():
+ for event in self._remote.subscribe():
if event.get('event') == 'release':
this.injector.seqno = event['seqno']
this.broadcast(event)
def handshake(url):
_logger.debug('Connecting to %r node', url)
- self._node = client.Connection(url, creds=self._creds)
- status = self._node.get(cmd='status')
+ self._remote = client.Connection(url, creds=self._creds)
+ status = self._remote.get(cmd='status')
seqno = status.get('seqno')
if seqno and 'releases' in seqno:
this.injector.seqno = seqno['releases']
- if self._inline.is_set():
+ if self.inline():
_logger.info('Reconnected to %r node', url)
else:
self._got_online(url)
@@ -322,36 +345,63 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
self._inline_job.spawn_later(timeout, connect)
def _checkin_context(self, pin=None):
- context = this.volume['context'][this.request.guid]
- if not context.exists:
+ contexts = self._local.volume['context']
+ local_context = contexts[this.request.guid]
+ if not local_context.exists:
enforce(self.inline(), http.ServiceUnavailable,
'Not available in offline')
- _logger.debug('Checkin %r context', context.guid)
- clone = self.fallback(
- method='GET', path=['context', context.guid], cmd='clone')
- seqno, __ = this.volume.patch(next(parcel.decode(clone)))
- if seqno:
- ranges.exclude(self._push_r.value, seqno, seqno)
- pins = context['pins']
+ _logger.debug('Checkin %r context', local_context.guid)
+ pull = Request(method='GET',
+ path=['context', local_context.guid], cmd='diff')
+ self._pull_checkin(pull, None, 'range')
+ pins = local_context['pins']
if pin and pin not in pins:
- this.volume['context'].update(context.guid, {'pins': pins + [pin]})
+ contexts.update(local_context.guid, {'pins': pins + [pin]})
def _checkout_context(self, pin=None):
- directory = this.volume['context']
- context = directory[this.request.guid]
- if not context.exists:
+ contexts = self._local.volume['context']
+ local_context = contexts[this.request.guid]
+ if not local_context.exists:
return
- pins = set(context.repr('pins'))
+ pins = set(local_context.repr('pins'))
if pin:
pins -= set([pin])
- if not self._inline.is_set() or pins:
+ if not self.inline() or pins:
if pin:
- directory.update(context.guid, {'pins': list(pins)})
+ contexts.update(local_context.guid, {'pins': list(pins)})
else:
- directory.delete(context.guid)
+ contexts.delete(local_context.guid)
+
+ def _pull_checkin(self, request, response, header_key):
+ request.headers[header_key] = self._pull_r.value
+ patch = self.fallback(request, response)
+ __, committed = self._local.volume.patch(next(parcel.decode(patch)),
+ shift_seqno=False)
+ ranges.exclude(self._pull_r.value, committed)
+
+ def _sync(self):
+ _logger.info('Start pulling updates')
+
+ for directory in self._local.volume.values():
+ if directory.empty:
+ continue
+ request = Request(method='GET',
+ path=[directory.metadata.name], cmd='diff')
+ response = Response()
+ while True:
+ request.headers['range'] = self._pull_r.value
+ r, guids = self.fallback(request, response)
+ if not r:
+ break
+ for guid in guids:
+ checkin = Request(method='GET',
+ path=[request.resource, guid], cmd='diff')
+ self._pull_checkin(checkin, response, 'range')
+ ranges.exclude(self._pull_r.value, r)
+ self._pull_r.commit()
+ this.localcast({'event': 'sync', 'state': 'pull'})
- def _push(self):
- return
+ """
resource = None
metadata = None
@@ -396,6 +446,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes):
request.content_type = 'application/json'
request.content = props
self.fallback(request)
+ """
class _LocalRoutes(db.Routes, Router):
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index 54fd78a..ce5bb1b 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -150,7 +150,7 @@ class Blobs(object):
if exists(path):
stat = os.stat(path)
if seqno != int(stat.st_mtime):
- _logger.debug('Found updated %r file', path)
+ _logger.debug('Found updated %r blob', path)
seqno = self._seqno.next()
meta = _read_meta(path)
meta['x-seqno'] = str(seqno)
@@ -169,7 +169,7 @@ class Blobs(object):
elif not is_files or exists(path + _META_SUFFIX):
continue
else:
- _logger.debug('Found new %r file', path)
+ _logger.debug('Found new %r blob', path)
mime_type = mimetypes.guess_type(filename)[0] or \
'application/octet-stream'
if checkin_seqno is None:
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py
index ecda920..17ff27d 100644
--- a/sugar_network/db/directory.py
+++ b/sugar_network/db/directory.py
@@ -56,6 +56,10 @@ class Directory(object):
self._open()
+ @property
+ def empty(self):
+ return True if self._index is None else (self._index.mtime == 0)
+
def wipe(self):
self.close()
_logger.debug('Wipe %r directory', self.metadata.name)
@@ -182,21 +186,32 @@ class Directory(object):
self._save_layout()
self.commit()
+ def diff(self, r):
+ for start, end in r:
+ query = 'seqno:%s..' % start
+ if end:
+ query += str(end)
+ docs, __ = self.find(query=query, order_by='seqno')
+ for doc in docs:
+ yield doc
+
def patch(self, guid, patch, seqno=None):
"""Apply changes for documents."""
doc = self.resource(guid, self._storage.get(guid))
+ merged = False
for prop, meta in patch.items():
orig_meta = doc.meta(prop)
if orig_meta and orig_meta['mtime'] >= meta['mtime']:
continue
- if doc.post_seqno is None:
- if seqno is None:
+ if doc.post_seqno is None and seqno is not False:
+ if not seqno:
seqno = self._seqno.next()
doc.post_seqno = seqno
doc.post(prop, **meta)
+ merged = True
- if doc.post_seqno is not None and doc.exists:
+ if merged and doc.exists:
# No need in after-merge event, further commit event
# is enough to avoid increasing events flow
self._index.store(guid, doc.posts, self._preindex)
@@ -234,6 +249,8 @@ class Directory(object):
if not doc.post_seqno and not doc.metadata[prop].acl & ACL.LOCAL:
doc.post_seqno = self._seqno.next()
doc.post(prop, changes[prop])
+ if not doc.exists:
+ return None
for prop in self.metadata.keys():
enforce(doc[prop] is not None, 'Empty %r property', prop)
return doc
diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py
index 89ea6e8..0270dd4 100644
--- a/sugar_network/db/index.py
+++ b/sugar_network/db/index.py
@@ -70,14 +70,13 @@ class IndexReader(object):
@property
def mtime(self):
"""UNIX seconds of the last `commit()` call."""
- return int(os.stat(self._mtime_path).st_mtime)
+ if exists(self._mtime_path):
+ return int(os.stat(self._mtime_path).st_mtime)
+ else:
+ return 0
def ensure_open(self):
- if not exists(self._mtime_path):
- with file(self._mtime_path, 'w'):
- pass
- # Outter code should understand the initial state
- os.utime(self._mtime_path, (0, 0))
+ pass
def get_cached(self, guid):
"""Return cached document.
@@ -337,6 +336,8 @@ class IndexWriter(IndexReader):
if pre_cb is not None:
properties = pre_cb(guid, properties, *args)
+ if properties is None:
+ return
_logger.debug('Index %r object: %r', self.metadata.name, properties)
@@ -419,7 +420,11 @@ class IndexWriter(IndexReader):
self._db.flush()
checkpoint = time.time()
- os.utime(self._mtime_path, (checkpoint, checkpoint))
+ if exists(self._mtime_path):
+ os.utime(self._mtime_path, (checkpoint, checkpoint))
+ else:
+ with file(self._mtime_path, 'w'):
+ pass
self._pending_updates = 0
_logger.debug('Commit to %r took %s seconds',
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index 9af5086..2c2e46b 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -55,7 +55,7 @@ class Resource(object):
self._post_seqno = value
self.post('seqno', value)
- @indexed_property(Numeric, slot=1000, prefix='RS', acl=0)
+ @indexed_property(Numeric, slot=1000, prefix='RS', acl=0, default=0)
def seqno(self, value):
return value
@@ -85,7 +85,8 @@ class Resource(object):
def status(self, value):
return value
- @indexed_property(List, prefix='RP', default=[], acl=ACL.READ)
+ @indexed_property(List, prefix='RP', default=[],
+ acl=ACL.READ | ACL.LOCAL)
def pins(self, value):
return value
@@ -93,6 +94,10 @@ class Resource(object):
def exists(self):
return self.record is not None and self.record.consistent
+ @property
+ def available(self):
+ return self.exists and self['state'] != 'deleted'
+
def created(self):
ts = int(time.time())
self.posts['ctime'] = ts
@@ -160,7 +165,7 @@ class Resource(object):
if self.record is not None:
return self.record.get(prop)
- def diff(self, r):
+ def diff(self, r, out_r=None):
patch = {}
for name, prop in self.metadata.items():
if name == 'seqno' or prop.acl & (ACL.CALC | ACL.LOCAL):
@@ -171,6 +176,8 @@ class Resource(object):
seqno = meta.get('seqno')
if not ranges.contains(r, seqno):
continue
+ if out_r is not None:
+ ranges.include(out_r, seqno, seqno)
value = meta.get('value')
if isinstance(prop, Aggregated):
value_ = {}
@@ -178,6 +185,8 @@ class Resource(object):
agg_seqno = agg.pop('seqno')
if ranges.contains(r, agg_seqno):
value_[key] = agg
+ if out_r is not None:
+ ranges.include(out_r, agg_seqno, agg_seqno)
value = value_
patch[name] = {'mtime': meta['mtime'], 'value': value}
return patch
@@ -204,7 +213,7 @@ class Resource(object):
if prop.on_set is not None:
value = prop.on_set(self, value)
seqno = None
- if not prop.acl & ACL.LOCAL:
+ if self.post_seqno and not prop.acl & ACL.LOCAL:
seqno = meta['seqno'] = self.post_seqno
if seqno and isinstance(prop, Aggregated):
for agg in value.values():
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index c74a93e..a1bb75e 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -13,18 +13,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# pylint: disable-msg=W0611
+
import re
import logging
from contextlib import contextmanager
from sugar_network import toolkit
from sugar_network.db.metadata import Aggregated
-from sugar_network.toolkit.router import ACL, route, fallbackroute
+from sugar_network.toolkit.router import ACL, File
+from sugar_network.toolkit.router import route, postroute, fallbackroute
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, parcel, enforce
+from sugar_network.toolkit import http, parcel, ranges, enforce
_GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$')
+_GROUPED_DIFF_LIMIT = 1024
_logger = logging.getLogger('db.routes')
@@ -35,6 +39,17 @@ class Routes(object):
this.volume = self.volume = volume
self._find_limit = find_limit
+ @postroute
+ def postroute(self, result, exception):
+ request = this.request
+ if not request.guid:
+ return result
+ pull = request.headers['pull']
+ if pull is None:
+ return result
+ this.response.content_type = 'application/octet-stream'
+ return self._object_diff(pull)
+
@route('POST', [None], acl=ACL.AUTH, mime_type='application/json')
def create(self):
with self._post(ACL.CREATE) as doc:
@@ -45,25 +60,6 @@ class Routes(object):
self.volume[this.request.resource].create(doc.posts)
return doc['guid']
- @route('GET', [None],
- arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
- mime_type='application/json')
- def find(self, reply, limit):
- self._preget()
- request = this.request
- if self._find_limit and limit > self._find_limit:
- _logger.warning('The find limit is restricted to %s',
- self._find_limit)
- request['limit'] = self._find_limit
- documents, total = self.volume[request.resource].find(
- not_state='deleted', **request)
- result = [self._postget(i, reply) for i in documents]
- return {'total': total, 'result': result}
-
- @route('GET', [None, None], cmd='exists', mime_type='application/json')
- def exists(self):
- return self.volume[this.request.resource][this.request.guid].exists
-
@route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update(self):
with self._post(ACL.WRITE) as doc:
@@ -88,11 +84,30 @@ class Routes(object):
# to make master-slave synchronization possible
directory = self.volume[this.request.resource]
doc = directory[this.request.guid]
- enforce(doc.exists, http.NotFound, 'Resource not found')
+ enforce(doc.available, http.NotFound, 'Resource not found')
doc.posts['state'] = 'deleted'
doc.updated()
directory.update(doc.guid, doc.posts, 'delete')
+ @route('GET', [None],
+ arguments={'offset': int, 'limit': int, 'reply': ('guid',)},
+ mime_type='application/json')
+ def find(self, reply, limit):
+ self._preget()
+ request = this.request
+ if self._find_limit and limit > self._find_limit:
+ _logger.warning('The find limit is restricted to %s',
+ self._find_limit)
+ request['limit'] = self._find_limit
+ documents, total = self.volume[request.resource].find(
+ not_state='deleted', **request)
+ result = [self._postget(i, reply) for i in documents]
+ return {'total': total, 'result': result}
+
+ @route('GET', [None, None], cmd='exists', mime_type='application/json')
+ def exists(self):
+ return self.volume[this.request.resource][this.request.guid].available
+
@route('GET', [None, None], arguments={'reply': list},
mime_type='application/json')
def get(self, reply):
@@ -103,8 +118,7 @@ class Routes(object):
reply.append(prop.name)
self._preget()
doc = self.volume[this.request.resource].get(this.request.guid)
- enforce(doc.exists and doc['state'] != 'deleted', http.NotFound,
- 'Resource not found')
+ enforce(doc.available, http.NotFound, 'Resource not found')
return self._postget(doc, reply)
@route('GET', [None, None, None], mime_type='application/json')
@@ -166,15 +180,66 @@ class Routes(object):
del authors[user]
directory.update(request.guid, {'author': authors})
- @route('GET', [None, None], cmd='clone')
- def clone(self):
- clone = self.volume.clone(this.request.resource, this.request.guid)
- return parcel.encode([('push', None, clone)])
+ @route('GET', [None], cmd='diff', mime_type='application/json')
+ def grouped_diff(self, key):
+ if not key:
+ key = 'guid'
+ in_r = this.request.headers['range'] or [[1, None]]
+ out_r = []
+ diff = set()
+
+ for doc in self.volume[this.request.resource].diff(in_r):
+ diff.add(doc.guid)
+ if len(diff) > _GROUPED_DIFF_LIMIT:
+ break
+ ranges.include(out_r, doc['seqno'], doc['seqno'])
+ doc.diff(in_r, out_r)
+
+ return out_r, list(diff)
+
+ @route('GET', [None, None], cmd='diff')
+ def object_diff(self):
+ return self._object_diff(this.request.headers['range'])
@fallbackroute('GET', ['blobs'])
def blobs(self):
return self.volume.blobs.get(this.request.guid)
+ def _object_diff(self, in_r):
+ request = this.request
+ doc = self.volume[request.resource][request.guid]
+ enforce(doc.exists, http.NotFound, 'Resource not found')
+
+ out_r = []
+ if in_r is None:
+ in_r = [[1, None]]
+ patch = doc.diff(in_r, out_r)
+ if not patch:
+ return parcel.encode([(None, None, [])], compresslevel=0)
+
+ diff = [{'resource': request.resource},
+ {'guid': request.guid, 'patch': patch},
+ ]
+
+ def add_blob(blob):
+ if not isinstance(blob, File):
+ return
+ seqno = int(blob.meta['x-seqno'])
+ ranges.include(out_r, seqno, seqno)
+ diff.append(blob)
+
+ for prop, meta in patch.items():
+ prop = doc.metadata[prop]
+ value = prop.reprcast(meta['value'])
+ if isinstance(prop, Aggregated):
+ for __, aggvalue in value:
+ add_blob(aggvalue)
+ else:
+ add_blob(value)
+ diff.append({'commit': out_r})
+
+ return parcel.encode([(None, None, diff)], compresslevel=0)
+
@contextmanager
def _post(self, access):
content = this.request.content
@@ -197,7 +262,7 @@ class Routes(object):
doc.posts[name] = prop.default
else:
doc = self.volume[this.request.resource][this.request.guid]
- enforce(doc.exists, 'Resource not found')
+ enforce(doc.available, 'Resource not found')
this.resource = doc
def teardown(new, old):
@@ -244,7 +309,7 @@ class Routes(object):
def _useradd(self, authors, user, role):
props = {}
user_doc = self.volume['user'][user]
- if user_doc.exists:
+ if user_doc.available:
props['name'] = user_doc['name']
role |= ACL.INSYSTEM
else:
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 295fc02..382176c 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -19,7 +19,6 @@ from copy import deepcopy
from os.path import exists, join, abspath
from sugar_network import toolkit
-from sugar_network.db.metadata import Blob
from sugar_network.db.directory import Directory
from sugar_network.db.index import IndexWriter
from sugar_network.db.blobs import Blobs
@@ -64,6 +63,13 @@ class Volume(dict):
def root(self):
return self._root
+ @property
+ def empty(self):
+ for directory in self.values():
+ if not directory.empty:
+ return False
+ return True
+
def close(self):
"""Close operations with the server."""
_logger.info('Closing documents in %r', self._root)
@@ -90,19 +96,13 @@ class Volume(dict):
for resource, directory in self.items():
if one_way and directory.resource.one_way:
continue
- directory.commit()
yield {'resource': resource}
- for start, end in r:
- query = 'seqno:%s..' % start
- if end:
- query += str(end)
- docs, __ = directory.find(query=query, order_by='seqno')
- for doc in docs:
- patch = doc.diff(include)
- if patch:
- yield {'guid': doc.guid, 'patch': patch}
- found = True
- last_seqno = max(last_seqno, doc['seqno'])
+ for doc in directory.diff(r):
+ patch = doc.diff(include)
+ if patch:
+ yield {'guid': doc.guid, 'patch': patch}
+ found = True
+ last_seqno = max(last_seqno, doc['seqno'])
if blobs:
for blob in self.blobs.diff(include):
seqno = int(blob.meta.pop('x-seqno'))
@@ -124,27 +124,16 @@ class Volume(dict):
ranges.exclude(r, None, last_seqno)
yield {'commit': commit_r}
- def clone(self, resource, guid):
- doc = self[resource][guid]
- patch = doc.diff([[1, None]])
- if not patch:
- return
- for name, prop in self[resource].metadata.items():
- if isinstance(prop, Blob) and name in patch:
- yield self.blobs.get(patch[name]['value'])
- yield {'resource': resource}
- yield {'guid': guid, 'patch': patch}
-
- def patch(self, records):
+ def patch(self, records, shift_seqno=True):
directory = None
committed = []
- seqno = None
+ seqno = None if shift_seqno else False
for record in records:
if isinstance(record, File):
if seqno is None:
seqno = self.seqno.next()
- self.blobs.patch(record, seqno)
+ self.blobs.patch(record, seqno or 0)
continue
resource = record.get('resource')
if resource:
diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py
index c6b3321..3f6aef1 100644
--- a/sugar_network/model/__init__.py
+++ b/sugar_network/model/__init__.py
@@ -199,7 +199,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
this.call(method='POST', path=['context'], content=context_meta,
principal=principal)
else:
- enforce(doc.exists, http.NotFound, 'No context')
+ enforce(doc.available, http.NotFound, 'No context')
enforce(context_type in doc['type'],
http.BadRequest, 'Inappropriate bundle type')
diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py
index 78df790..9153552 100644
--- a/sugar_network/model/context.py
+++ b/sugar_network/model/context.py
@@ -21,10 +21,6 @@ from sugar_network.toolkit import svg_to_png
class Context(db.Resource):
- @db.indexed_property(db.List, prefix='P', default=[])
- def pins(self, value):
- return value
-
@db.indexed_property(db.List, prefix='T',
subtype=db.Enum(model.CONTEXT_TYPES))
def type(self, value):
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index eda26dc..8012853 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -17,7 +17,7 @@ import logging
from sugar_network.toolkit.router import route
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import coroutine
+from sugar_network.toolkit import coroutine, http
_logger = logging.getLogger('model.routes')
@@ -30,9 +30,9 @@ class FrontRoutes(object):
this.broadcast = self._broadcast
this.localcast = self._broadcast
- @route('GET', mime_type='text/html')
+ @route('GET')
def hello(self):
- return _HELLO_HTML
+ raise http.Redirect('http://wiki.sugarlabs.org/go/Sugar_Network/API')
@route('OPTIONS')
def options(self):
@@ -86,7 +86,7 @@ class FrontRoutes(object):
@route('GET', ['favicon.ico'])
def favicon(self):
- return this.volume.blobs.get('favicon.ico')
+ return this.volume.blobs.get('assets/favicon.ico')
def _broadcast(self, event):
_logger.debug('Broadcast event: %r', event)
@@ -97,10 +97,3 @@ class FrontRoutes(object):
coroutine.select([rfile.fileno()], [], [])
finally:
self._spooler.notify_all(rfile)
-
-
-_HELLO_HTML = """\
-<h2>Welcome to Sugar Network API!</h2>
-Visit the <a href="http://wiki.sugarlabs.org/go/Sugar_Network/API">
-Sugar Labs Wiki</a> to learn how it can be used.
-"""
diff --git a/sugar_network/node/auth.py b/sugar_network/node/auth.py
index 27b334c..00054f5 100644
--- a/sugar_network/node/auth.py
+++ b/sugar_network/node/auth.py
@@ -17,7 +17,7 @@ import time
import hashlib
import logging
from ConfigParser import ConfigParser
-from os.path import join, dirname, exists, expanduser, abspath
+from os.path import join, exists
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import pylru, http, enforce
@@ -89,7 +89,7 @@ class SugarAuth(object):
signature = creds['signature']
nonce = int(creds['nonce'])
user = this.volume['user'][login]
- enforce(user.exists, Unauthorized, 'Principal does not exist')
+ enforce(user.available, Unauthorized, 'Principal does not exist')
key = RSA.load_pub_key_bio(BIO.MemoryBuffer(str(user['pubkey'])))
data = hashlib.sha1('%s:%s' % (login, nonce)).digest()
enforce(key.verify(data, signature.decode('hex')),
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index b1cb401..144dab0 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -181,7 +181,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None,
if context in context_clauses:
return context_clauses[context]
context = volume['context'][context]
- enforce(context.exists, http.NotFound, 'Context not found')
+ enforce(context.available, http.NotFound, 'Context not found')
releases = context['releases']
clause = []
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index 4457b2f..ac8a840 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -13,15 +13,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# pylint: disable-msg=W0611
+
import logging
from os.path import join
from sugar_network import db
from sugar_network.model import FrontRoutes, load_bundle
from sugar_network.node import model
-# pylint: disable-msg=W0611
-from sugar_network.toolkit.router import route, postroute, ACL, File
-from sugar_network.toolkit.router import Request, fallbackroute, preroute
+from sugar_network.toolkit.router import ACL, File
+from sugar_network.toolkit.router import route, fallbackroute, preroute
from sugar_network.toolkit.spec import parse_requires, parse_version
from sugar_network.toolkit.bundle import Bundle
from sugar_network.toolkit.coroutine import this
@@ -83,7 +84,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
return {'guid': self.guid,
'seqno': {
'db': self.volume.seqno.value,
- 'releases': self.volume.releases_seqno.value,
+ 'releases': self.volume.release_seqno.value,
},
}
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index 675c25f..7585e29 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -464,6 +464,35 @@ class NamedTemporaryFile(object):
return getattr(self._file, name)
+class Variable(list):
+
+ def __init__(self, default=None):
+ list.__init__(self, [default])
+
+ @property
+ def value(self):
+ return self[0]
+
+ @value.setter
+ def value(self, value):
+ self[0] = value
+
+ def __contains__(self, key):
+ return key in self[0]
+
+ def __getitem__(self, key):
+ return self[0].get(key)
+
+ def __setitem__(self, key, value):
+ self[0][key] = value
+
+ def __delitem__(self, key):
+ del self[0][key]
+
+ def __getattr__(self, name):
+ return getattr(self[0], name)
+
+
class Bin(object):
"""Store variable in a file."""
@@ -471,10 +500,7 @@ class Bin(object):
self._path = abspath(path)
self.value = default_value
- if exists(self._path):
- with file(self._path) as f:
- self.value = json.load(f)
- else:
+ if not self.reset():
self.commit()
@property
@@ -491,6 +517,13 @@ class Bin(object):
f.flush()
os.fsync(f.fileno())
+ def reset(self):
+ if not exists(self._path):
+ return False
+ with file(self._path) as f:
+ self.value = json.load(f)
+ return True
+
def __enter__(self):
return self.value
@@ -535,6 +568,30 @@ class Seqno(Bin):
return self.value
+class CaseInsensitiveDict(dict):
+
+ def __contains__(self, key):
+ return dict.__contains__(self, key.lower())
+
+ def __getitem__(self, key):
+ return self.get(key.lower())
+
+ def __setitem__(self, key, value):
+ return self.set(key.lower(), value)
+
+ def __delitem__(self, key):
+ self.remove(key.lower())
+
+ def get(self, key, default=None):
+ return dict.get(self, key, default)
+
+ def set(self, key, value):
+ dict.__setitem__(self, key, value)
+
+ def remove(self, key):
+ dict.__delitem__(self, key)
+
+
class Pool(object):
"""Stack that keeps its iterators correct after changing content."""
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 0ebee86..0cbd535 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -267,6 +267,7 @@ class Connection(object):
value = request.environ.get(env_key)
if value is not None:
headers[key] = value
+ headers.update(request.headers)
path = request.path
while True:
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py
index 9d583cd..edbbf02 100644
--- a/sugar_network/toolkit/parcel.py
+++ b/sugar_network/toolkit/parcel.py
@@ -46,7 +46,14 @@ _logger = logging.getLogger('parcel')
def decode(stream, limit=None):
_logger.debug('Decode %r stream limit=%r', stream, limit)
- stream = _UnzipStream(stream, limit)
+ if limit is not None:
+ limit -= 2
+ magic = stream.read(2)
+ enforce(len(magic) == 2, http.BadRequest, 'Malformed parcel')
+ if magic == '\037\213':
+ stream = _ZippedDecoder(stream, limit)
+ else:
+ stream = _Decoder(magic, stream, limit)
header = stream.read_record()
packet = _DecodeIterator(stream)
@@ -63,7 +70,11 @@ def encode(packets, limit=None, header=None, compresslevel=None,
_logger.debug('Encode %r packets limit=%r header=%r',
packets, limit, header)
- ostream = _ZipStream(compresslevel)
+ if compresslevel is 0:
+ ostream = _Encoder()
+ else:
+ ostream = _ZippedEncoder(compresslevel)
+
# In case of downloading blobs
# (?) reuse current `this.http`
this.http = http.Connection()
@@ -242,16 +253,10 @@ class _DecodeIterator(object):
pass
-class _ZipStream(object):
+class _Encoder(object):
- def __init__(self, compresslevel=None):
- if compresslevel is None:
- compresslevel = DEFAULT_COMPRESSLEVEL
- self._zipper = zlib.compressobj(compresslevel,
- zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
+ def __init__(self):
self._offset = 0
- self._size = 0
- self._crc = zlib.crc32('') & 0xffffffffL
def write_record(self, record, limit=None):
chunk = json.dumps(record) + '\n'
@@ -260,49 +265,58 @@ class _ZipStream(object):
return self.write(chunk)
def write(self, chunk):
+ chunk = self._encode(chunk)
+ if chunk:
+ self._offset += len(chunk)
+ return chunk
+
+ def flush(self):
+ chunk = self._flush()
+ self._offset += len(chunk)
+ return chunk
+
+ def _encode(self, chunk):
+ return chunk
+
+ def _flush(self):
+ return ''
+
+
+class _ZippedEncoder(_Encoder):
+
+ def __init__(self, compresslevel=None):
+ _Encoder.__init__(self)
+ if compresslevel is None:
+ compresslevel = DEFAULT_COMPRESSLEVEL
+ self._zipper = zlib.compressobj(compresslevel,
+ zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
+ self._size = 0
+ self._crc = zlib.crc32('') & 0xffffffffL
+
+ def _encode(self, chunk):
self._size += len(chunk)
self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
chunk = self._zipper.compress(chunk)
-
if self._offset == 0:
chunk = '\037\213' + '\010' + chr(0) + \
struct.pack('<L', long(time.time())) + \
'\002' + '\377' + \
chunk
self._offset = _ZLIB_WBITS_SIZE
- if chunk:
- self._offset += len(chunk)
-
return chunk
- def flush(self):
- chunk = self._zipper.flush() + \
+ def _flush(self):
+ return self._zipper.flush() + \
struct.pack('<L', self._crc) + \
struct.pack('<L', self._size & 0xffffffffL)
- self._offset += len(chunk)
- return chunk
-class _UnzipStream(object):
+class _Decoder(object):
- def __init__(self, stream, limit):
+ def __init__(self, prefix, stream, limit):
+ self._buffer = prefix
self._stream = stream
self._limit = limit
- self._unzipper = zlib.decompressobj(-_ZLIB_WBITS)
- self._crc = zlib.crc32('') & 0xffffffffL
- self._size = 0
- self._buffer = ''
-
- if self._limit is not None:
- self._limit -= 10
- magic = stream.read(2)
- enforce(magic == '\037\213', http.BadRequest,
- 'Not a gzipped file')
- enforce(ord(stream.read(1)) == 8, http.BadRequest,
- 'Unknown compression method')
- enforce(ord(stream.read(1)) == 0, http.BadRequest,
- 'Gzip flags should be empty')
- stream.read(6) # Ignore the rest of header
def read_record(self):
while True:
@@ -328,20 +342,41 @@ class _UnzipStream(object):
if self._limit is not None:
size = min(size, self._limit)
chunk = self._stream.read(size)
+ if chunk and self._limit is not None:
+ self._limit -= len(chunk)
+ return self._decode(chunk)
+
+ def _decode(self, chunk):
+ self._buffer += chunk
+ return bool(self._buffer)
+
+class _ZippedDecoder(_Decoder):
+
+ def __init__(self, stream, limit):
+ _Decoder.__init__(self, '', stream, limit)
+ self._unzipper = zlib.decompressobj(-_ZLIB_WBITS)
+ self._crc = zlib.crc32('') & 0xffffffffL
+ self._size = 0
+
+ if self._limit is not None:
+ self._limit -= 8
+ enforce(ord(stream.read(1)) == 8, http.BadRequest,
+ 'Unknown compression method')
+ enforce(ord(stream.read(1)) == 0, http.BadRequest,
+ 'Gzip flags should be empty')
+ stream.read(6) # Ignore the rest of header
+
+ def _decode(self, chunk):
if chunk:
- if self._limit is not None:
- self._limit -= len(chunk)
self._add_to_buffer(self._unzipper.decompress(chunk))
return True
-
enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest,
'Malformed gzipped file')
crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0]
enforce(crc == self._crc, http.BadRequest, 'CRC check failed')
size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0]
enforce(size == self._size, http.BadRequest, 'Incorrect length')
-
return self._add_to_buffer(self._unzipper.flush())
def _add_to_buffer(self, chunk):
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index e9e91fd..f4b23ce 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -140,6 +140,7 @@ class Request(dict):
else:
dict.__setitem__(self, key, value)
self.environ = environ
+ self.headers = _RequestHeaders(self.environ)
if method:
self.environ['REQUEST_METHOD'] = method
@@ -312,35 +313,15 @@ class Request(dict):
(self.method, self.path, self.cmd, dict(self))
-class CaseInsensitiveDict(dict):
-
- def __contains__(self, key):
- return dict.__contains__(self, key.lower())
-
- def __getitem__(self, key):
- return self.get(key.lower())
-
- def __setitem__(self, key, value):
- return self.set(key.lower(), value)
-
- def __delitem__(self, key):
- self.remove(key.lower())
-
- def get(self, key, default=None):
- return dict.get(self, key, default)
-
- def set(self, key, value):
- dict.__setitem__(self, key, value)
-
- def remove(self, key):
- dict.__delitem__(self, key)
-
-
-class Response(CaseInsensitiveDict):
+class Response(toolkit.CaseInsensitiveDict):
status = '200 OK'
relocations = 0
+ def __init__(self):
+ toolkit.CaseInsensitiveDict.__init__(self)
+ self.headers = _ResponseHeaders(self)
+
@property
def content_length(self):
return int(self.get('content-length') or '0')
@@ -392,7 +373,7 @@ class File(str):
pass
def __new__(cls, path=None, digest=None, meta=None):
- meta = CaseInsensitiveDict(meta or [])
+ meta = toolkit.CaseInsensitiveDict(meta or [])
url = ''
if meta:
@@ -568,7 +549,7 @@ class Router(object):
raise
finally:
for i in self._postroutes:
- i(result, exception)
+ result = i(result, exception)
return result
@@ -915,4 +896,39 @@ class _Route(object):
return '%s /%s (%s)' % (self.method, path, self.callback.__name__)
+class _RequestHeaders(dict):
+
+ def __init__(self, environ):
+ dict.__init__(self)
+ self._environ = environ
+
+ def __contains__(self, key):
+ return 'HTTP_X_%s' % key.upper() in self._environ
+
+ def __getitem__(self, key):
+ value = self._environ.get('HTTP_X_%s' % key.upper())
+ if value is not None:
+ return json.loads(value)
+
+ def __setitem__(self, key, value):
+ dict.__setitem__(self, 'x-%s' % key, json.dumps(value))
+
+
+class _ResponseHeaders(object):
+
+ def __init__(self, headers):
+ self._headers = headers
+
+ def __contains__(self, key):
+ return 'x-%s' % key.lower() in self._headers
+
+ def __getitem__(self, key):
+ value = self._headers.get('x-%s' % key.lower())
+ if value is not None:
+ return json.loads(value)
+
+ def __setitem__(self, key, value):
+ self._headers.set('x-%s' % key.lower(), json.dumps(value))
+
+
File.AWAY = File(None)
diff --git a/tests/__init__.py b/tests/__init__.py
index e1c3222..32bc3ea 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -24,10 +24,12 @@ from sugar_network.toolkit import http, mountpoints, Option, gbus, i18n, languag
from sugar_network.toolkit.router import Router, Request, Response
from sugar_network.toolkit.coroutine import this
from sugar_network.client import IPCConnection, journal, routes as client_routes, model as client_model
+from sugar_network.client.model import Volume as LocalVolume
from sugar_network.client.injector import Injector
from sugar_network.client.routes import ClientRoutes
from sugar_network.client.auth import SugarCreds
from sugar_network import db, client, node, toolkit, model
+from sugar_network.db import routes as db_routes
from sugar_network.model.user import User
from sugar_network.model.context import Context
from sugar_network.node.model import Context as MasterContext
@@ -103,6 +105,8 @@ class Test(unittest.TestCase):
client.cache_lifetime.value = 0
client.keyfile.value = join(root, 'data', UID)
client_routes._RECONNECT_TIMEOUT = 0
+ client_routes._SYNC_TIMEOUT = 30
+ db_routes._GROUPED_DIFF_LIMIT = 1024
journal._ds_root = tmpdir + '/datastore'
mountpoints._connects.clear()
mountpoints._found.clear()
@@ -141,6 +145,7 @@ class Test(unittest.TestCase):
this.volume = None
this.call = None
this.broadcast = lambda x: x
+ this.localcast = lambda x: x
this.injector = None
this.principal = None
@@ -287,12 +292,14 @@ class Test(unittest.TestCase):
this.call = self.node_router.call
return self.node_volume
- def fork_master(self, classes=None, routes=MasterRoutes):
+ def fork_master(self, classes=None, routes=MasterRoutes, cb=None):
if classes is None:
classes = master.RESOURCES
def node():
volume = NodeVolume('master', classes)
+ if cb is not None:
+ cb(volume)
node = coroutine.WSGIServer(('127.0.0.1', 7777), Router(routes(volume=volume, auth=SugarAuth('master'))))
node.serve_forever()
@@ -314,10 +321,7 @@ class Test(unittest.TestCase):
def start_online_client(self, classes=None):
self.fork_master(classes)
this.injector = Injector('client/cache')
- if classes:
- home_volume = db.Volume('client', classes)
- else:
- home_volume = client_model.Volume('client')
+ home_volume = LocalVolume('client', classes)
self.client_routes = ClientRoutes(home_volume, SugarCreds(client.keyfile.value))
self.client_routes.connect(client.api.value)
self.wait_for_events(self.client_routes, event='inline', state='online').wait()
diff --git a/tests/units/client/injector.py b/tests/units/client/injector.py
index 7170758..b266cda 100755
--- a/tests/units/client/injector.py
+++ b/tests/units/client/injector.py
@@ -14,6 +14,7 @@ from __init__ import tests
from sugar_network import db, client
from sugar_network.client import Connection, keyfile, api, packagekit, injector as injector_, model
from sugar_network.client.injector import _PreemptivePool, Injector
+from sugar_network.client.model import Volume as LocalVolume
from sugar_network.client.auth import SugarCreds
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, lsb_release
@@ -349,7 +350,8 @@ class InjectorTest(tests.Test):
assert not exists('releases/2')
def test_solve(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -382,7 +384,8 @@ class InjectorTest(tests.Test):
self.assertEqual([client.api.value, 'stable', 0, solution], json.load(file('client/solutions/context')))
def test_solve_FailInOffline(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = None
@@ -403,7 +406,8 @@ class InjectorTest(tests.Test):
self.assertRaises(http.ServiceUnavailable, injector._solve, 'context', 'stable')
def test_solve_ReuseCachedSolution(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -420,13 +424,14 @@ class InjectorTest(tests.Test):
]))), cmd='submit', initial=True)
assert 'context' in injector._solve('context', 'stable')
- volume['context'].delete('context')
+ conn.delete(['context', 'context'])
assert 'context' in injector._solve('context', 'stable')
os.unlink('client/solutions/context')
self.assertRaises(http.NotFound, injector._solve, 'context', 'stable')
def test_solve_InvalidateCachedSolution(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = 'http://127.0.0.1:7777'
@@ -492,7 +497,8 @@ class InjectorTest(tests.Test):
self.assertEqual(['http://localhost:7777', 'stable', 2], json.load(file('client/solutions/context'))[:-1])
def test_solve_ForceUsingStaleCachedSolutionInOffline(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -519,7 +525,8 @@ class InjectorTest(tests.Test):
self.assertRaises(http.ServiceUnavailable, injector._solve, 'context', 'stable')
def test_download_SetExecPermissions(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -552,7 +559,8 @@ class InjectorTest(tests.Test):
assert not os.access(path + 'test/file2', os.X_OK)
def test_checkin(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -603,7 +611,8 @@ class InjectorTest(tests.Test):
[i for i in injector.checkin('context')])
def test_checkin_PreemptivePool(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -660,7 +669,8 @@ class InjectorTest(tests.Test):
self.assertEqual([], this.volume['context']['context']['pins'])
def test_checkin_Refresh(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -694,7 +704,8 @@ class InjectorTest(tests.Test):
assert exists('client/releases/%s' % release2)
def test_launch(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -742,7 +753,8 @@ class InjectorTest(tests.Test):
[i for i in injector.launch('context', activity_id='activity_id')])
def test_launch_PreemptivePool(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -783,7 +795,8 @@ class InjectorTest(tests.Test):
self.assertEqual(len(activity_info), injector._pool._du)
def test_launch_DonntAcquireCheckins(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector('client')
injector.api = client.api.value
@@ -810,7 +823,8 @@ class InjectorTest(tests.Test):
assert injector._pool._du == 0
def test_launch_RefreshCheckins(self):
- self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector(tests.tmpdir + '/client')
injector.api = client.api.value
@@ -862,7 +876,26 @@ class InjectorTest(tests.Test):
self.assertEqual('2', file('client/releases/%s/output' % release2).read())
def test_launch_InstallDeps(self):
- volume = self.start_master()
+
+ def master_cb(volume):
+ distro = '%s-%s' % (lsb_release.distributor_id(), lsb_release.release())
+ volume['context'].create({
+ 'guid': 'package1', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': {
+ 'resolves': {
+ distro: {'version': [[1], 0], 'packages': ['pkg1', 'pkg2']},
+ },
+ },
+ })
+ volume['context'].create({
+ 'guid': 'package2', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': {
+ 'resolves': {
+ distro: {'version': [[1], 0], 'packages': ['pkg3', 'pkg4']},
+ },
+ },
+ })
+
+ self.fork_master(cb=master_cb)
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector(tests.tmpdir + '/client')
injector.api = client.api.value
@@ -878,21 +911,6 @@ class InjectorTest(tests.Test):
'license = Public Domain',
'requires = package1; package2',
]))), cmd='submit', initial=True)
- distro = '%s-%s' % (lsb_release.distributor_id(), lsb_release.release())
- volume['context'].create({
- 'guid': 'package1', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': {
- 'resolves': {
- distro: {'version': [[1], 0], 'packages': ['pkg1', 'pkg2']},
- },
- },
- })
- volume['context'].create({
- 'guid': 'package2', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': {
- 'resolves': {
- distro: {'version': [[1], 0], 'packages': ['pkg3', 'pkg4']},
- },
- },
- })
packages = []
self.override(packagekit, 'install', lambda names: packages.extend(names))
@@ -902,14 +920,15 @@ class InjectorTest(tests.Test):
self.assertEqual(['pkg1', 'pkg2', 'pkg3', 'pkg4'], sorted(packages))
def test_launch_Document(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector(tests.tmpdir + '/client')
injector.api = client.api.value
injector.seqno = 1
- volume['context'].create({'guid': 'book', 'type': ['book'], 'title': {}, 'summary': {}, 'description': {}})
- book = conn.upload(['context'], 'book', cmd='submit', context='book', version='1', license='Public Domain')
+ book_context = conn.post(['context'], {'type': ['book'], 'title': {}, 'summary': {}, 'description': {}})
+ book = conn.upload(['context'], 'book', cmd='submit', context=book_context, version='1', license='Public Domain')
app = conn.upload(['context'], self.zips(
('topdir/activity/activity.info', '\n'.join([
@@ -929,21 +948,22 @@ class InjectorTest(tests.Test):
self.assertEqual(
{'event': 'launch', 'state': 'exit'},
- [i for i in injector.launch('book', activity_id='activity_id', app='app')][-1])
+ [i for i in injector.launch(book_context, activity_id='activity_id', app='app')][-1])
self.assertEqual(
'-b app -a activity_id -u %s/client/releases/%s' % (tests.tmpdir, book),
file('client/releases/%s/output' % app).read())
def test_launch_DocumentWithDetectingAppByMIMEType(self):
- volume = self.start_master()
+ self.fork_master()
+ this.volume = LocalVolume('client')
conn = Connection(creds=SugarCreds(keyfile.value))
injector = Injector(tests.tmpdir + '/client')
injector.api = client.api.value
injector.seqno = 1
- volume['context'].create({'guid': 'book', 'type': ['book'], 'title': {}, 'summary': {}, 'description': {}})
- book = conn.upload(['context'], 'book', cmd='submit', context='book', version='1', license='Public Domain')
+ book_context = conn.post(['context'], {'type': ['book'], 'title': {}, 'summary': {}, 'description': {}})
+ book = conn.upload(['context'], 'book', cmd='submit', context=book_context, version='1', license='Public Domain')
app = conn.upload(['context'], self.zips(
('topdir/activity/activity.info', '\n'.join([
@@ -964,7 +984,7 @@ class InjectorTest(tests.Test):
self.override(injector_, '_app_by_mimetype', lambda mime_type: 'app')
self.assertEqual(
{'event': 'launch', 'state': 'exit'},
- [i for i in injector.launch('book', activity_id='activity_id')][-1])
+ [i for i in injector.launch(book_context, activity_id='activity_id')][-1])
self.assertEqual(
'-b app -a activity_id -u %s/client/releases/%s' % (tests.tmpdir, book),
diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py
index 6072571..9145b42 100755
--- a/tests/units/client/routes.py
+++ b/tests/units/client/routes.py
@@ -12,7 +12,7 @@ from os.path import exists
from __init__ import tests
from sugar_network import db, client, toolkit
-from sugar_network.client import journal, IPCConnection, cache_limit, cache_lifetime, api, injector, routes
+from sugar_network.client import journal, Connection, IPCConnection, cache_limit, cache_lifetime, api, injector, routes
from sugar_network.client.model import Volume
from sugar_network.client.injector import Injector
from sugar_network.client.routes import ClientRoutes
@@ -81,25 +81,19 @@ class RoutesTest(tests.Test):
]),
], header={'to': '127.0.0.1:7777', 'from': 'slave'})), params={'cmd': 'push'})
- self.assertEqual([
- {'guid': '1'},
- {'guid': '2'},
- ],
- ipc.get(['context'], query='йцу')['result'])
- self.assertEqual([
- {'guid': '1'},
- {'guid': '2'},
- ],
- ipc.get(['context'], query='qwe')['result'])
+ self.assertEqual(
+ sorted(['1', '2']),
+ sorted([i['guid'] for i in ipc.get(['context'], query='йцу')['result']]))
+ self.assertEqual(
+ sorted(['1', '2']),
+ sorted([i['guid'] for i in ipc.get(['context'], query='qwe')['result']]))
- self.assertEqual([
- {'guid': '2'},
- ],
- ipc.get(['context'], query='йцукен')['result'])
- self.assertEqual([
- {'guid': '2'},
- ],
- ipc.get(['context'], query='qwerty')['result'])
+ self.assertEqual(
+ sorted(['2']),
+ sorted([i['guid'] for i in ipc.get(['context'], query='йцукен')['result']]))
+ self.assertEqual(
+ sorted(['2']),
+ sorted([i['guid'] for i in ipc.get(['context'], query='qwerty')['result']]))
def test_LanguagesFallbackInRequests(self):
self.start_online_client()
@@ -340,14 +334,12 @@ class RoutesTest(tests.Test):
self.assertEqual(
blob,
ipc.request('GET', ['context', guid, 'logo']).content)
- self.assertEqual({
- 'logo': 'http://127.0.0.1:7777/blobs/%s' % digest,
- },
- ipc.get(['context', guid], reply=['logo']))
- self.assertEqual([{
- 'logo': 'http://127.0.0.1:7777/blobs/%s' % digest,
- }],
- ipc.get(['context'], reply=['logo'])['result'])
+ self.assertEqual(
+ 'http://127.0.0.1:7777/blobs/%s' % digest,
+ ipc.get(['context', guid], reply=['logo'])['logo'])
+ self.assertEqual(
+ ['http://127.0.0.1:7777/blobs/%s' % digest],
+ [i['logo'] for i in ipc.get(['context'], reply=['logo'])['result']])
def test_OnlinePins(self):
home_volume = self.start_online_client()
@@ -388,13 +380,9 @@ class RoutesTest(tests.Test):
'description': 'description',
})
- self.assertEqual(sorted([
- {'guid': guid1, 'title': '1', 'pins': []},
- {'guid': guid2, 'title': '2', 'pins': []},
- {'guid': guid3, 'title': '3', 'pins': []},
- {'guid': guid4, 'title': '4', 'pins': []},
- ]),
- sorted(ipc.get(['context'], reply=['guid', 'title', 'pins'])['result']))
+ self.assertEqual(
+ sorted([(guid1, []), (guid2, []), (guid3, []), (guid4, [])]),
+ sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']]))
self.assertEqual([
],
ipc.get(['context'], reply=['guid', 'title'], pins='favorite')['result'])
@@ -420,34 +408,25 @@ class RoutesTest(tests.Test):
home_volume['context'].update(guid2, {'title': {i18n.default_lang(): '2_'}})
home_volume['context'].update(guid3, {'title': {i18n.default_lang(): '3_'}})
- self.assertEqual(sorted([
- {'guid': guid1, 'title': '1', 'pins': ['favorite']},
- {'guid': guid2, 'title': '2', 'pins': ['checkin', 'favorite']},
- {'guid': guid3, 'title': '3', 'pins': ['checkin']},
- {'guid': guid4, 'title': '4', 'pins': []},
- ]),
- sorted(ipc.get(['context'], reply=['guid', 'title', 'pins'])['result']))
+ self.assertEqual(
+ sorted([(guid1, ['favorite']), (guid2, ['checkin', 'favorite']), (guid3, ['checkin']), (guid4, [])]),
+ sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']]))
self.assertEqual([
{'guid': guid1, 'title': '1_'},
{'guid': guid2, 'title': '2_'},
],
ipc.get(['context'], reply=['guid', 'title'], pins='favorite')['result'])
- self.assertEqual([
- {'guid': guid2, 'title': '2_'},
- {'guid': guid3, 'title': '3_'},
- ],
- ipc.get(['context'], reply=['guid', 'title'], pins='checkin')['result'])
+
+ self.assertEqual(
+ sorted([(guid2, '2_'), (guid3, '3_')]),
+ sorted([(i['guid'], i['title']) for i in ipc.get(['context'], reply=['guid', 'title'], pins='checkin')['result']]))
ipc.delete(['context', guid1], cmd='favorite')
ipc.delete(['context', guid2], cmd='checkin')
- self.assertEqual(sorted([
- {'guid': guid1, 'pins': []},
- {'guid': guid2, 'pins': ['favorite']},
- {'guid': guid3, 'pins': ['checkin']},
- {'guid': guid4, 'pins': []},
- ]),
- sorted(ipc.get(['context'], reply=['guid', 'pins'])['result']))
+ self.assertEqual(
+ sorted([(guid1, []), (guid2, ['favorite']), (guid3, ['checkin']), (guid4, [])]),
+ sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']]))
self.assertEqual([
{'guid': guid2, 'pins': ['favorite']},
],
@@ -496,12 +475,9 @@ class RoutesTest(tests.Test):
{'event': 'checkin', 'state': 'ready'},
],
[i for i in ipc.put(['context', '2'], None, cmd='checkin')])
- self.assertEqual([
- {'guid': '1', 'pins': ['favorite']},
- {'guid': '2', 'pins': ['checkin']},
- {'guid': '3', 'pins': []},
- ],
- ipc.get(['context'], reply=['guid', 'pins'])['result'])
+ self.assertEqual(
+ sorted([('1', ['favorite']), ('2', ['checkin']), ('3', [])]),
+ sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['guid', 'pins'])['result']]))
self.stop_master()
self.wait_for_events(event='inline', state='offline').wait()
@@ -692,8 +668,7 @@ class RoutesTest(tests.Test):
ipc = IPCConnection()
self.assertEqual([
- {'event': 'checkin', 'state': 'solve'},
- {'error': 'Context not found', 'event': 'failure', 'exception': 'NotFound'},
+ {'error': 'Resource not found', 'event': 'failure', 'exception': 'NotFound'},
],
[i for i in ipc.put(['context', 'context'], None, cmd='checkin')])
@@ -862,11 +837,10 @@ class RoutesTest(tests.Test):
self.assertEqual('done', events[-1]['event'])
guid = events[-1]['guid']
- self.assertEqual({
- 'context': 'context',
- 'error': 'error',
- },
- ipc.get(['report', guid], reply=['context', 'error']))
+ report = ipc.get(['report', guid], reply=['context', 'error'])
+ self.assertEqual('context', report['context'])
+ self.assertEqual('error', report['error'])
+
self.assertEqual(sorted([
'content1',
'content2',
@@ -1016,7 +990,7 @@ class RoutesTest(tests.Test):
assert time.time() - ts >= 2
def kill():
- coroutine.sleep(.5)
+ coroutine.sleep(.4)
self.waitpid(node_pid)
coroutine.spawn(kill)
@@ -1095,13 +1069,114 @@ class RoutesTest(tests.Test):
self.assertEqual([{'event': 'pong'}], events)
assert Routes.subscribe_tries > 2
+ def test_PullCheckinsOnGets(self):
+ local_volume = self.start_online_client()
+ local = IPCConnection()
+ remote = Connection(creds=SugarCreds(client.keyfile.value))
+
+ self.assertEqual([[1, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': '1',
+ 'summary': '',
+ 'description': '',
+ })
+ local.put(['context', guid], None, cmd='favorite')
+ self.assertEqual('1', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ coroutine.sleep(1.1)
+
+ self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+ remote.put(['context', guid, 'title'], '2')
+ self.assertEqual('2', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+ self.assertEqual('2', local.get(['context'], reply='title')['result'][0]['title'])
+ coroutine.sleep(.1)
+ self.assertEqual('2', remote.get(['context', guid, 'title']))
+ self.assertEqual('2', local.get(['context', guid])['title'])
+ self.assertEqual([[1, 1], [7, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+ def test_PullCheckinsOnGettingOnline(self):
+ routes._RECONNECT_TIMEOUT = 1
+ routes._SYNC_TIMEOUT = 0
+ local_volume = self.start_online_client()
+ local = IPCConnection()
+ remote = Connection(creds=SugarCreds(client.keyfile.value))
+
+ self.assertEqual([[1, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': '1',
+ 'summary': '',
+ 'description': '',
+ })
+ local.put(['context', guid], None, cmd='favorite')
+ self.assertEqual('1', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ coroutine.sleep(1.1)
+
+ remote.put(['context', guid, 'title'], '2')
+ self.assertEqual('2', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ self.stop_master()
+ self.wait_for_events(event='inline', state='offline').wait()
+ self.fork_master()
+ self.wait_for_events(event='sync', state='pull').wait()
+
+ self.assertEqual('2', local.get(['context', guid])['title'])
+ self.assertEqual([[1, 1], [7, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ def test_PullCheckinsOnUpdates(self):
+ local_volume = self.start_online_client()
+ local = IPCConnection()
+ remote = Connection(creds=SugarCreds(client.keyfile.value))
+
+ self.assertEqual([[1, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ guid = remote.post(['context'], {
+ 'type': 'activity',
+ 'title': '1',
+ 'summary': '1',
+ 'description': '',
+ })
+ local.put(['context', guid], None, cmd='favorite')
+ self.assertEqual('1', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ coroutine.sleep(1.1)
+ remote.put(['context', guid, 'title'], '2')
+ self.assertEqual('2', remote.get(['context', guid, 'title']))
+ self.assertEqual('1', remote.get(['context', guid, 'summary']))
+ self.assertEqual('1', local.get(['context', guid])['title'])
+ self.assertEqual('1', local.get(['context', guid])['summary'])
+ self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
+
+ local.put(['context', guid, 'summary'], '2')
+ self.assertEqual('2', remote.get(['context', guid, 'title']))
+ self.assertEqual('2', remote.get(['context', guid, 'summary']))
+ self.assertEqual('2', local.get(['context', guid])['title'])
+ self.assertEqual('2', local.get(['context', guid])['summary'])
+ self.assertEqual([[1, 1], [8, None]], self.client_routes._pull_r.value)
+ self.assertEqual(0, local_volume.seqno.value)
def ___test_CachedClientRoutes(self):
volume = db.Volume('client', RESOURCES, lazy_open=True)
diff --git a/tests/units/db/index.py b/tests/units/db/index.py
index cb144c6..c0072c1 100755
--- a/tests/units/db/index.py
+++ b/tests/units/db/index.py
@@ -420,14 +420,18 @@ class IndexTest(tests.Test):
post_stored = []
deleted = []
+ def pre_stored_cb(*args):
+ pre_stored.append(args)
+ return {}
+
db.store('1', {},
- lambda *args: pre_stored.append(args),
+ pre_stored_cb,
lambda *args: post_stored.append(args))
self.assertEqual(1, len(pre_stored))
self.assertEqual(1, len(post_stored))
db.store('1', {},
- lambda *args: pre_stored.append(args),
+ pre_stored_cb,
lambda *args: post_stored.append(args))
self.assertEqual(2, len(pre_stored))
self.assertEqual(2, len(post_stored))
diff --git a/tests/units/db/resource.py b/tests/units/db/resource.py
index 05aaddf..4bf80b7 100755
--- a/tests/units/db/resource.py
+++ b/tests/units/db/resource.py
@@ -483,6 +483,45 @@ class ResourceTest(tests.Test):
self.assertEqual('set2!', doc['prop1'])
self.assertEqual('set2!', doc['prop3'])
+ def test_diff_OutputRange(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop1(self, value):
+ return value
+
+ @db.stored_property()
+ def prop2(self, value):
+ return value
+
+ directory = Directory(tests.tmpdir, Document, IndexWriter, _SessionSeqno(), this.broadcast)
+
+ guid = directory.create({'prop1': '1', 'prop2': '1'})
+ self.utime('db', 0)
+
+ out_r = []
+ self.assertEqual({
+ 'guid': {'mtime': 0, 'value': guid},
+ 'prop1': {'mtime': 0, 'value': '1'},
+ 'prop2': {'mtime': 0, 'value': '1'},
+ },
+ directory[guid].diff([[1, None]], out_r))
+ self.assertEqual([[1, 1]], out_r)
+
+ directory.update(guid, {'prop1': '2'})
+ directory.update(guid, {'prop2': '2'})
+ self.utime('db', 0)
+
+ out_r = []
+ self.assertEqual({
+ 'guid': {'mtime': 0, 'value': guid},
+ 'prop1': {'mtime': 0, 'value': '2'},
+ 'prop2': {'mtime': 0, 'value': '2'},
+ },
+ directory[guid].diff([[1, None]], out_r))
+ self.assertEqual([[1, 3]], out_r)
+
class _SessionSeqno(object):
diff --git a/tests/units/db/routes.py b/tests/units/db/routes.py
index 4189502..5d5a547 100755
--- a/tests/units/db/routes.py
+++ b/tests/units/db/routes.py
@@ -3,6 +3,7 @@
import os
import sys
+import json
import time
import shutil
import hashlib
@@ -16,10 +17,11 @@ src_root = abspath(dirname(__file__))
from __init__ import tests
from sugar_network import db, toolkit
+from sugar_network.db import routes as db_routes
from sugar_network.model.user import User
from sugar_network.toolkit.router import Router, Request, Response, fallbackroute, ACL, File
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import coroutine, http, i18n
+from sugar_network.toolkit import coroutine, http, i18n, parcel
class RoutesTest(tests.Test):
@@ -1942,6 +1944,169 @@ class RoutesTest(tests.Test):
[{'event': 'delete', 'resource': 'document', 'guid': guid}],
events)
+ def test_ObjectDiff(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop1(self, value):
+ return value
+
+ @db.stored_property()
+ def prop2(self, value):
+ return value
+
+ @db.stored_property(db.Blob)
+ def prop3(self, value):
+ return value
+
+ @db.stored_property(db.Blob)
+ def prop4(self, value):
+ return value
+
+ volume = db.Volume('.', [Document])
+ router = Router(db.Routes(volume))
+
+ volume['document'].create({
+ 'guid': 'guid',
+ 'prop1': '1',
+ 'prop2': 2,
+ 'prop3': volume.blobs.post('333', '3/3').digest,
+ })
+ volume['document'].update('guid', {'prop4': volume.blobs.post('4444', '4/4').digest})
+ self.utime('db/document/gu/guid', 1)
+
+ patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff')])
+ self.assertEqual([(
+ {'packet': None}, [
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'guid': {'value': 'guid', 'mtime': 1},
+ 'prop1': {'value': '1', 'mtime': 1},
+ 'prop2': {'value': 2, 'mtime': 1},
+ 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1},
+ 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1},
+ }},
+ {'content-type': '4/4', 'content-length': '4', 'x-seqno': '3'},
+ {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'},
+ {'commit': [[1, 4]]},
+ ],
+ )],
+ [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))])
+
+ patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={
+ 'HTTP_X_RANGE': json.dumps([[1, 1]]),
+ })])
+ self.assertEqual([(
+ {'packet': None}, [],
+ )],
+ [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))])
+
+ patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={
+ 'HTTP_X_RANGE': json.dumps([[2, 2]]),
+ })])
+ self.assertEqual([(
+ {'packet': None}, [
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'guid': {'value': 'guid', 'mtime': 1},
+ 'prop1': {'value': '1', 'mtime': 1},
+ 'prop2': {'value': 2, 'mtime': 1},
+ 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1},
+ }},
+ {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'},
+ {'commit': [[1, 2]]},
+ ],
+ )],
+ [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))])
+
+ patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={
+ 'HTTP_X_RANGE': json.dumps([[3, 3]]),
+ })])
+ self.assertEqual([(
+ {'packet': None}, [],
+ )],
+ [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))])
+
+ patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={
+ 'HTTP_X_RANGE': json.dumps([[4, 4]]),
+ })])
+ self.assertEqual([(
+ {'packet': None}, [
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1},
+ }},
+ {'content-type': '4/4', 'content-length': '4', 'x-seqno': '3'},
+ {'commit': [[3, 4]]},
+ ],
+ )],
+ [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))])
+
+ def test_GroupedDiff(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop(self, value):
+ return value
+
+ volume = db.Volume('.', [Document])
+ router = Router(db.Routes(volume))
+
+ volume['document'].create({'guid': '1', 'prop': 'q'})
+ volume['document'].create({'guid': '2', 'prop': 'w'})
+ volume['document'].create({'guid': '3', 'prop': 'w'})
+ volume['document'].create({'guid': '4', 'prop': 'e'})
+ volume['document'].create({'guid': '5', 'prop': 'e'})
+ volume['document'].create({'guid': '6', 'prop': 'e'})
+ self.utime('db/document', 0)
+
+ self.assertEqual({
+ '1': [[1, 1]],
+ '2': [[2, 2]],
+ '3': [[3, 3]],
+ '4': [[4, 4]],
+ '5': [[5, 5]],
+ '6': [[6, 6]],
+ },
+ this.call(method='GET', path=['document'], cmd='diff'))
+
+ self.assertEqual({
+ 'q': [[1, 1]],
+ 'w': [[2, 3]],
+ 'e': [[4, 6]],
+ },
+ this.call(method='GET', path=['document'], cmd='diff', key='prop'))
+
+ def test_GroupedDiffLimit(self):
+ db_routes._GROUPED_DIFF_LIMIT = 2
+
+ class Document(db.Resource):
+ pass
+
+ volume = db.Volume('.', [Document])
+ router = Router(db.Routes(volume))
+
+ volume['document'].create({'guid': '1'})
+ volume['document'].create({'guid': '2'})
+ volume['document'].create({'guid': '3'})
+ volume['document'].create({'guid': '4'})
+ volume['document'].create({'guid': '5'})
+ self.utime('db/document', 0)
+
+ self.assertEqual({
+ '1': [[1, 1]],
+ '2': [[2, 2]],
+ },
+ this.call(method='GET', path=['document'], cmd='diff'))
+
+ self.assertEqual({
+ '3': [[3, 3]],
+ '4': [[4, 4]],
+ },
+ this.call(method='GET', path=['document'], cmd='diff', environ={'HTTP_X_RANGE': json.dumps([[3, None]])}))
+
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/db/volume.py b/tests/units/db/volume.py
index 22d4782..a770a35 100755
--- a/tests/units/db/volume.py
+++ b/tests/units/db/volume.py
@@ -323,51 +323,6 @@ class VolumeTest(tests.Test):
self.assertRaises(StopIteration, patch.next)
self.assertEqual([[4, None]], r)
- def test_clone(self):
-
- class Document(db.Resource):
-
- @db.stored_property()
- def prop1(self, value):
- return value
-
- @db.stored_property()
- def prop2(self, value):
- return value
-
- @db.stored_property(db.Blob)
- def prop3(self, value):
- return value
-
- @db.stored_property(db.Blob)
- def prop4(self, value):
- return value
-
- volume = db.Volume('.', [Document])
-
- volume['document'].create({
- 'guid': 'guid',
- 'prop1': '1',
- 'prop2': 2,
- 'prop3': volume.blobs.post('333', '3/3').digest,
- 'prop4': volume.blobs.post('4444', '4/4').digest,
- })
- self.utime('db/document/gu/guid', 1)
-
- self.assertEqual([
- {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'},
- {'content-type': '4/4', 'content-length': '4', 'x-seqno': '2'},
- {'resource': 'document'},
- {'guid': 'guid', 'patch': {
- 'guid': {'value': 'guid', 'mtime': 1},
- 'prop1': {'value': '1', 'mtime': 1},
- 'prop2': {'value': 2, 'mtime': 1},
- 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1},
- 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1},
- }},
- ],
- [i.meta if isinstance(i, File) else i for i in volume.clone('document', 'guid')])
-
def test_patch_New(self):
class Document(db.Resource):
@@ -973,6 +928,177 @@ class VolumeTest(tests.Test):
[dict(i) for i in volume.diff(r, files=['foo'])])
self.assertEqual([[4, None]], r)
+ def test_DoNotShiftSeqnoForLocalProps(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop1(self, value):
+ return value
+
+ @db.stored_property(acl=ACL.PUBLIC | ACL.LOCAL)
+ def prop2(self, value):
+ return value
+
+ directory = db.Volume('.', [Document])['document']
+
+ directory.create({'guid': '1', 'prop1': '1', 'prop2': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('db/document', 0)
+ self.assertEqual(
+ {'seqno': 1, 'value': 1, 'mtime': 0},
+ directory['1'].meta('seqno'))
+ self.assertEqual(
+ {'seqno': 1, 'value': '1', 'mtime': 0},
+ directory['1'].meta('prop1'))
+ self.assertEqual(
+ {'value': '1', 'mtime': 0},
+ directory['1'].meta('prop2'))
+
+ directory.update('1', {'prop2': '2'})
+ self.utime('db/document', 0)
+ self.assertEqual(
+ {'seqno': 1, 'value': 1, 'mtime': 0},
+ directory['1'].meta('seqno'))
+ self.assertEqual(
+ {'seqno': 1, 'value': '1', 'mtime': 0},
+ directory['1'].meta('prop1'))
+ self.assertEqual(
+ {'value': '2', 'mtime': 0},
+ directory['1'].meta('prop2'))
+
+ directory.update('1', {'prop1': '2'})
+ self.utime('db/document', 0)
+ self.assertEqual(
+ {'seqno': 2, 'value': 2, 'mtime': 0},
+ directory['1'].meta('seqno'))
+ self.assertEqual(
+ {'seqno': 2, 'value': '2', 'mtime': 0},
+ directory['1'].meta('prop1'))
+ self.assertEqual(
+ {'value': '2', 'mtime': 0},
+ directory['1'].meta('prop2'))
+
+ def test_patch_SeqnoLess(self):
+
+ class Document(db.Resource):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume1 = db.Volume('1', [Document])
+ volume1['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('1/db/document/1/1', 1)
+ volume1.blobs.post('1')
+
+ volume2 = db.Volume('2', [Document])
+ volume2.patch(volume1.diff([[1, None]]), shift_seqno=False)
+
+ self.assertEqual(
+ [(1, '1', 1, '1')],
+ [(i['ctime'], i['prop'], i['mtime'], i['guid']) for i in volume2['document'].find()[0]])
+
+ doc = volume2['document'].get('1')
+ self.assertEqual(0, doc.get('seqno'))
+ assert 'seqno' not in doc.meta('guid')
+ assert 'seqno' not in doc.meta('ctime')
+ assert 'seqno' not in doc.meta('mtime')
+ assert 'seqno' not in doc.meta('prop')
+
+ blob = volume2.blobs.get(hashlib.sha1('1').hexdigest())
+ self.assertEqual({
+ 'x-seqno': '0',
+ 'content-length': '1',
+ 'content-type': 'application/octet-stream',
+ },
+ blob.meta)
+ self.assertEqual('1', file(blob.path).read())
+
+ def test_diff_IgnoreSeqnolessUpdates(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop1(self, value):
+ return value
+
+ @db.stored_property(acl=ACL.PUBLIC | ACL.LOCAL)
+ def prop2(self, value):
+ return value
+
+ volume = db.Volume('.', [Document])
+
+ volume['document'].create({'guid': '1', 'prop1': '1', 'prop2': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('db/document/1/1', 1)
+
+ r = [[1, None]]
+ self.assertEqual([
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'prop1': {'value': '1', 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ],
+ [i.meta if isinstance(i, File) else i for i in volume.diff(r)])
+ self.assertEqual([[2, None]], r)
+
+ volume['document'].update('1', {'prop2': '2'})
+ self.utime('db/document/1/1', 1)
+
+ r = [[1, None]]
+ self.assertEqual([
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'prop1': {'value': '1', 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ],
+ [i.meta if isinstance(i, File) else i for i in volume.diff(r)])
+ self.assertEqual([[2, None]], r)
+
+ volume['document'].update('1', {'prop1': '2'})
+ self.utime('db/document/1/1', 1)
+
+ r = [[1, None]]
+ self.assertEqual([
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'prop1': {'value': '2', 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 2]]},
+ ],
+ [i.meta if isinstance(i, File) else i for i in volume.diff(r)])
+ self.assertEqual([[3, None]], r)
+
+ self.assertEqual(False, volume['document'].patch('1', {'prop1': {'mtime': 2, 'value': '3'}}, seqno=False))
+ self.assertEqual('3', volume['document']['1']['prop1'])
+ self.utime('db/document/1/1', 1)
+
+ r = [[1, None]]
+ self.assertEqual([
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 2]]},
+ ],
+ [i.meta if isinstance(i, File) else i for i in volume.diff(r)])
+ self.assertEqual([[3, None]], r)
+
+
+
+
class _SessionSeqno(object):
diff --git a/tests/units/node/slave.py b/tests/units/node/slave.py
index 2b32b70..10e5742 100755
--- a/tests/units/node/slave.py
+++ b/tests/units/node/slave.py
@@ -107,7 +107,6 @@ class SlaveTest(tests.Test):
self.assertEqual([[7, None]], json.load(file('slave/var/push.ranges')))
coroutine.sleep(1)
- slave.put(['document', guid1], {'message': 'a'})
slave.put(['document', guid2], {'message': 'b'})
slave.put(['document', guid3], {'message': 'c'})
guid4 = slave.post(['document'], {'message': 'd', 'title': ''})
@@ -119,7 +118,7 @@ class SlaveTest(tests.Test):
]),
sorted(master.get(['document'], reply=['guid', 'message'])['result']))
self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges')))
- self.assertEqual([[12, None]], json.load(file('slave/var/push.ranges')))
+ self.assertEqual([[11, None]], json.load(file('slave/var/push.ranges')))
def test_online_sync_Pull(self):
self.fork_master([User, self.Document])
@@ -172,7 +171,6 @@ class SlaveTest(tests.Test):
self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges')))
coroutine.sleep(1)
- master.put(['document', guid1], {'message': 'a'})
master.put(['document', guid2], {'message': 'b'})
master.put(['document', guid3], {'message': 'c'})
guid4 = master.post(['document'], {'message': 'd', 'title': ''})
@@ -183,7 +181,7 @@ class SlaveTest(tests.Test):
{'guid': guid4, 'message': 'd'},
],
slave.get(['document'], reply=['guid', 'message'])['result'])
- self.assertEqual([[12, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[11, None]], json.load(file('slave/var/pull.ranges')))
self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges')))
def test_online_sync_PullBlobs(self):
diff --git a/tests/units/toolkit/parcel.py b/tests/units/toolkit/parcel.py
index 1a24a3f..17fa146 100755
--- a/tests/units/toolkit/parcel.py
+++ b/tests/units/toolkit/parcel.py
@@ -18,7 +18,7 @@ from sugar_network.toolkit import parcel, http, coroutine
class ParcelTest(tests.Test):
- def test_decode(self):
+ def test_decode_Zipped(self):
stream = zips(
json.dumps({'foo': 'bar'}) + '\n'
)
@@ -96,7 +96,85 @@ class ParcelTest(tests.Test):
self.assertRaises(StopIteration, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- def test_decode_WithLimit(self):
+ def test_decode_NotZipped(self):
+ stream = StringIO(
+ json.dumps({'foo': 'bar'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = StringIO(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual('foo', packet['bar'])
+ packet_iter = iter(packet)
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = StringIO(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = StringIO(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = StringIO(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_ZippedWithLimit(self):
payload = zips(
json.dumps({}) + '\n' +
json.dumps({'packet': 'first'}) + '\n' +
@@ -114,6 +192,24 @@ class ParcelTest(tests.Test):
pass
self.assertEqual(len(payload), stream.tell())
+ def test_decode_NotZippedWithLimit(self):
+ payload = StringIO(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 'first'}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ ).getvalue()
+ tail = '.' * 100
+
+ stream = StringIO(payload + tail)
+ for i in parcel.decode(stream):
+ pass
+ self.assertEqual(len(payload + tail), stream.tell())
+
+ stream = StringIO(payload + tail)
+ for i in parcel.decode(stream, limit=len(payload)):
+ pass
+ self.assertEqual(len(payload), stream.tell())
+
def test_decode_Empty(self):
self.assertRaises(http.BadRequest, parcel.decode(StringIO()).next)
@@ -254,7 +350,7 @@ class ParcelTest(tests.Test):
self.assertRaises(StopIteration, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- def test_encode(self):
+ def test_encode_Zipped(self):
stream = ''.join([i for i in parcel.encode([])])
self.assertEqual(
json.dumps({}) + '\n' +
@@ -300,6 +396,52 @@ class ParcelTest(tests.Test):
json.dumps({'packet': 'last'}) + '\n',
unzips(stream))
+ def test_encode_NotZipped(self):
+ stream = ''.join([i for i in parcel.encode([], compresslevel=0)])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ stream)
+
+ stream = ''.join([i for i in parcel.encode([(None, None, None)], header={'foo': 'bar'}, compresslevel=0)])
+ self.assertEqual(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': None}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ stream)
+
+ stream = ''.join([i for i in parcel.encode([
+ (1, {}, None),
+ ('2', {'n': 2}, []),
+ ('3', {'n': 3}, iter([])),
+ ], compresslevel=0)])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'packet': '2', 'n': 2}) + '\n' +
+ json.dumps({'packet': '3', 'n': 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ stream)
+
+ stream = ''.join([i for i in parcel.encode([
+ (1, None, [{1: 1}]),
+ (2, None, [{2: 2}, {2: 2}]),
+ (3, None, [{3: 3}, {3: 3}, {3: 3}]),
+ ], compresslevel=0)])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({1: 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({2: 2}) + '\n' +
+ json.dumps({2: 2}) + '\n' +
+ json.dumps({'packet': 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ stream)
+
def test_limited_encode(self):
RECORD = 1024 * 1024
diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py
index e9ee798..63d4646 100755
--- a/tests/units/toolkit/router.py
+++ b/tests/units/toolkit/router.py
@@ -535,7 +535,7 @@ class RouterTest(tests.Test):
['_afz'],
[i for i in router({'REQUEST_METHOD': 'PROBE', 'PATH_INFO': '/'}, lambda *args: None)])
- def test_routes_Post(self):
+ def test_routes_Postroutes(self):
postroutes = []
class A(object):
@@ -551,24 +551,28 @@ class RouterTest(tests.Test):
@postroute
def _(self, result, exception):
postroutes.append(('_', result, str(exception)))
+ return result
class B1(A):
@postroute
def z(self, result, exception):
postroutes.append(('z', result, str(exception)))
+ return result
class B2(object):
@postroute
def f(self, result, exception):
postroutes.append(('f', result, str(exception)))
+ return result
class C(B1, B2):
@postroute
def a(self, result, exception):
postroutes.append(('a', result, str(exception)))
+ return result
router = Router(C())
@@ -595,6 +599,25 @@ class RouterTest(tests.Test):
],
postroutes)
+ def test_routes_UpdateResultInPostroutes(self):
+ postroutes = []
+
+ class A(object):
+
+ @route('OK')
+ def ok(self):
+ return 'ok'
+
+ @postroute
+ def postroute(self, result, exception):
+ return result + '!'
+
+ router = Router(A())
+
+ self.assertEqual(
+ ['ok!'],
+ [i for i in router({'REQUEST_METHOD': 'OK', 'PATH_INFO': '/'}, lambda *args: None)])
+
def test_routes_WildcardsAsLastResort(self):
class Routes(object):