Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-03-07 17:02:19 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-03-09 05:15:13 (GMT)
commit41cb9e831db3d66973292fbdb4c13fb658ac9f59 (patch)
treedec6b37654de3f0d6d12dca6c413b2890cd8ef9f
parent90f74541ec4925bad47466e39517c22ff7eadfe4 (diff)
Fix node synchronization; remove usage stats, it should be standalone project/process
-rwxr-xr-xsugar-network4
-rwxr-xr-xsugar-network-client5
-rwxr-xr-xsugar-network-node33
-rw-r--r--sugar_network/db/blobs.py15
-rw-r--r--sugar_network/db/resource.py7
-rw-r--r--sugar_network/db/volume.py38
-rw-r--r--sugar_network/node/__init__.py25
-rw-r--r--sugar_network/node/downloads.py128
-rw-r--r--sugar_network/node/files.py183
-rw-r--r--sugar_network/node/master.py288
-rw-r--r--sugar_network/node/model.py67
-rw-r--r--sugar_network/node/routes.py89
-rw-r--r--sugar_network/node/slave.py203
-rw-r--r--sugar_network/node/stats_user.py127
-rw-r--r--sugar_network/toolkit/__init__.py2
-rw-r--r--sugar_network/toolkit/http.py16
-rw-r--r--sugar_network/toolkit/rrd.py296
-rw-r--r--tests/__init__.py50
-rw-r--r--tests/data/c41529f1d629e60bdc21434011133f2c8f65f64315
-rwxr-xr-xtests/integration/master_personal.py2
-rwxr-xr-xtests/integration/master_slave.py2
-rwxr-xr-xtests/integration/node_client.py4
-rwxr-xr-xtests/integration/node_packages.py8
-rwxr-xr-xtests/regression/api.py2
-rwxr-xr-xtests/units/client/offline_routes.py4
-rwxr-xr-xtests/units/client/online_routes.py34
-rwxr-xr-xtests/units/client/releases.py6
-rwxr-xr-xtests/units/client/routes.py12
-rwxr-xr-xtests/units/db/volume.py82
-rw-r--r--tests/units/node/__main__.py10
-rwxr-xr-xtests/units/node/downloads.py119
-rwxr-xr-xtests/units/node/files.py357
-rwxr-xr-xtests/units/node/master.py549
-rwxr-xr-xtests/units/node/model.py432
-rwxr-xr-xtests/units/node/node.py118
-rwxr-xr-xtests/units/node/slave.py529
-rwxr-xr-xtests/units/node/stats_user.py123
-rwxr-xr-xtests/units/node/sync_master.py668
-rwxr-xr-xtests/units/node/sync_offline.py249
-rwxr-xr-xtests/units/node/sync_online.py272
-rw-r--r--tests/units/toolkit/__main__.py1
-rwxr-xr-xtests/units/toolkit/rrd.py291
42 files changed, 1513 insertions, 3952 deletions
diff --git a/sugar-network b/sugar-network
index 63e1078..04caa59 100755
--- a/sugar-network
+++ b/sugar-network
@@ -78,7 +78,7 @@ class ClientRouter(Router, ClientRoutes):
home = db.Volume(client.path('db'), RESOURCES)
Router.__init__(self, self)
ClientRoutes.__init__(self, home,
- client.api_url.value if not offline.value else None,
+ client.api.value if not offline.value else None,
no_subscription=True)
if not offline.value:
for __ in self.subscribe(event='inline', state='online'):
@@ -139,7 +139,7 @@ class Application(application.Application):
if self.check_for_instance():
conn = IPCConnection()
else:
- conn = Connection(client.api_url.value)
+ conn = Connection(client.api.value)
guid = conn.upload(['release'], path, cmd='submit', **props)
if porcelain.value:
diff --git a/sugar-network-client b/sugar-network-client
index 87bf65e..3891c99 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -43,7 +43,6 @@ class Application(application.Daemon):
def __init__(self, **kwargs):
application.Daemon.__init__(self, **kwargs)
- node.sync_layers.value = client.layers.value
self.jobs = coroutine.Pool()
new_root = (client.local_root.value != client.local_root.default)
@@ -102,7 +101,7 @@ class Application(application.Daemon):
def run(self):
volume = db.Volume(client.path('db'), RESOURCES)
routes = CachedClientRoutes(volume,
- client.api_url.value if not client.server_mode.value else None)
+ client.api.value if not client.server_mode.value else None)
router = Router(routes, allow_spawn=True)
logging.info('Listening for IPC requests on %s port',
@@ -203,7 +202,7 @@ Option.seek('main', [toolkit.cachedir])
Option.seek('webui', webui)
Option.seek('client', client)
Option.seek('db', db)
-Option.seek('node', [node.port, node.find_limit, node.sync_layers])
+Option.seek('node', [node.port, node.find_limit])
Option.seek('user-stats', stats_user)
app = Application(
diff --git a/sugar-network-node b/sugar-network-node
index 6721337..f35e8ad 100755
--- a/sugar-network-node
+++ b/sugar-network-node
@@ -19,16 +19,14 @@ import os
import locale
import gettext
import logging
-from os.path import exists, join
+from os.path import exists
from gevent import monkey
-from sugar_network import db, node, client, toolkit, model
-from sugar_network.node import stats_user, obs, master, slave
-from sugar_network.node import model as master_model
-from sugar_network.node.routes import generate_node_stats
+from sugar_network import db, node, toolkit, model
+from sugar_network.node import obs, master, slave
from sugar_network.toolkit.http import Connection
-from sugar_network.toolkit.router import Router, Request, Response
+from sugar_network.toolkit.router import Router
from sugar_network.toolkit import coroutine, application, Option, enforce
@@ -55,20 +53,16 @@ class Application(application.Daemon):
if node.certfile.value:
ssl_args['certfile'] = node.certfile.value
- master_path = join(node.data_root.value, 'master')
- if exists(master_path):
- with file(master_path) as f:
- node_key = f.read().strip()
+ if node.master_mode.value:
node_class = master.MasterRoutes
- resources = master_model.RESOURCES
- logging.info('Start %s node in master mode', node_key)
+ resources = master.RESOURCES
+ logging.info('Start master node')
else:
- node_key = join(node.data_root.value, 'node.key')
node_class = slave.SlaveRoutes
resources = model.RESOURCES
logging.info('Start slave node')
volume = db.Volume(node.data_root.value, resources)
- cp = node_class(node_key, volume, find_limit=node.find_limit.value)
+ cp = node_class(volume=volume, find_limit=node.find_limit.value)
self.jobs.spawn(volume.populate)
logging.info('Listening for requests on %s:%s',
@@ -81,7 +75,6 @@ class Application(application.Daemon):
try:
self.jobs.join()
finally:
- cp.close()
volume.close()
def shutdown(self):
@@ -102,14 +95,6 @@ class Application(application.Daemon):
path = self.args.pop(0)
self._ensure_instance().post(cmd='offline-sync', path=path)
- @application.command(
- 're-generate node statistics', name='restat')
- def restat(self):
- enforce(not self.check_for_instance(), 'Shutdown the server at first')
- volume = db.Volume(node.data_root.value, model.RESOURCES)
- volume.populate()
- generate_node_stats(volume, join(node.stats_root.value, 'node'))
-
def _ensure_instance(self):
enforce(self.check_for_instance(), 'Node is not started')
return Connection('http://localhost:%s' %
@@ -126,9 +111,7 @@ locale.setlocale(locale.LC_ALL, '')
Option.seek('main', application)
Option.seek('main', [toolkit.cachedir])
-Option.seek('client', [client.api_url])
Option.seek('node', node)
-Option.seek('user-stats', stats_user)
Option.seek('obs', obs)
Option.seek('db', db)
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index cd795c6..52bc324 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -18,7 +18,7 @@ import logging
import hashlib
import mimetypes
from contextlib import contextmanager
-from os.path import exists, abspath, join, dirname
+from os.path import exists, abspath, join, dirname, isdir
from sugar_network import toolkit
from sugar_network.toolkit.router import File
@@ -104,10 +104,16 @@ class Blobs(object):
path = self.path(digest)
if exists(path + _META_SUFFIX):
return File(path, digest, _read_meta(path))
+ elif isdir(path):
+ return _lsdir(path, digest)
def delete(self, path):
self._delete(path, None)
+ def populate(self, path=None, recursive=True):
+ for __ in self.diff([[1, None]], path or '', recursive):
+ pass
+
def diff(self, r, path=None, recursive=True):
if path is None:
is_files = False
@@ -228,3 +234,10 @@ def _read_meta(path):
key, value = line.split(':', 1)
meta[key] = value.strip()
return meta
+
+
+def _lsdir(root, rel_root):
+ for filename in os.listdir(root):
+ path = join(root, filename)
+ if exists(path + _META_SUFFIX):
+ yield File(path, join(rel_root, filename), _read_meta(path))
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index 71a3efd..9d94929 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -123,7 +123,6 @@ class Resource(object):
def diff(self, r):
patch = {}
- last_seqno = None
for name, prop in self.metadata.items():
if name == 'seqno' or prop.acl & ACL.CALC:
continue
@@ -133,16 +132,16 @@ class Resource(object):
seqno = meta.get('seqno')
if not ranges.contains(r, seqno):
continue
- last_seqno = max(seqno, last_seqno)
value = meta.get('value')
if isinstance(prop, Aggregated):
value_ = {}
for key, agg in value.items():
- if ranges.contains(r, agg.pop('seqno')):
+ agg_seqno = agg.pop('seqno')
+ if ranges.contains(r, agg_seqno):
value_[key] = agg
value = value_
patch[name] = {'mtime': meta['mtime'], 'value': value}
- return last_seqno, patch
+ return patch
def format_patch(self, props):
if not props:
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 5ec5683..5d9bac1 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -76,8 +76,15 @@ class Volume(dict):
for __ in cls.populate():
coroutine.dispatch()
- def diff(self, r, files=None, one_way=False):
+ def diff(self, r, exclude=None, files=None, one_way=False):
+ if exclude:
+ include = deepcopy(r)
+ ranges.exclude(include, exclude)
+ else:
+ include = r
last_seqno = None
+ found = False
+
try:
for resource, directory in self.items():
if one_way and directory.resource.one_way:
@@ -90,33 +97,34 @@ class Volume(dict):
query += str(end)
docs, __ = directory.find(query=query, order_by='seqno')
for doc in docs:
- seqno, patch = doc.diff(r)
- if not patch:
- continue
- yield {'guid': doc.guid, 'patch': patch}
- last_seqno = max(last_seqno, seqno)
- for blob in self.blobs.diff(r):
+ patch = doc.diff(include)
+ if patch:
+ yield {'guid': doc.guid, 'patch': patch}
+ found = True
+ last_seqno = max(last_seqno, doc['seqno'])
+ for blob in self.blobs.diff(include):
seqno = int(blob.pop('x-seqno'))
yield blob
+ found = True
last_seqno = max(last_seqno, seqno)
for dirpath in files or []:
- for blob in self.blobs.diff(r, dirpath):
+ for blob in self.blobs.diff(include, dirpath):
seqno = int(blob.pop('x-seqno'))
yield blob
+ found = True
last_seqno = max(last_seqno, seqno)
except StopIteration:
pass
- if last_seqno:
- commit_r = deepcopy(r)
+ if found:
+ commit_r = include if exclude else deepcopy(r)
ranges.exclude(commit_r, last_seqno + 1, None)
ranges.exclude(r, None, last_seqno)
yield {'commit': commit_r}
def patch(self, records):
directory = None
- commit_r = []
- merged_r = []
+ committed = []
seqno = None
for record in records:
@@ -137,12 +145,10 @@ class Volume(dict):
commit = record.get('commit')
if commit is not None:
- ranges.include(commit_r, commit)
+ ranges.include(committed, commit)
continue
- if seqno is not None:
- ranges.include(merged_r, seqno, seqno)
- return commit_r, merged_r
+ return seqno, committed
def __enter__(self):
return self
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index a4360b4..14d675c 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -34,28 +34,17 @@ certfile = Option(
name='certfile')
data_root = Option(
- 'path to a directory to place server data',
+ 'path to a directory to place node data',
default='/var/lib/sugar-network', name='data_root')
find_limit = Option(
'limit the resulting list for search requests',
default=64, type_cast=int, name='find-limit')
-stats_root = Option(
- 'path to the root directory for placing stats',
- default='/var/lib/sugar-network/stats', name='stats_root')
+mode = Option(
+ 'node running mode, should be one of "slave", "proxy", or, "master"',
+ default='slave')
-files_root = Option(
- 'path to a directory to keep files synchronized between nodes',
- default='/var/lib/sugar-network/files', name='files_root')
-
-pull_timeout = Option(
- 'delay in seconds to return to sync-pull requester to wait until '
- 'pull request will be ready',
- default=30, type_cast=int)
-
-sync_layers = Option(
- 'comma separated list of layers to restrict Sugar Network '
- 'synchronization content',
- default=[], type_cast=Option.list_cast,
- type_repr=Option.list_repr, name='sync-layers')
+master_api = Option(
+ 'master API url either to connect to (for slave or proxy nodes), or,'
+ 'to provide from (for master nodes)')
diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py
deleted file mode 100644
index b7156bc..0000000
--- a/sugar_network/node/downloads.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright (C) 2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""Persistent pool with temporary files prepared to download."""
-
-import os
-import json
-import hashlib
-import logging
-from glob import glob
-from os.path import join, splitext, exists
-
-from sugar_network.toolkit import pylru, coroutine, exception
-
-
-# Maximum numer of postponed pulls master can handle at the same time
-_POOL_SIZE = 256
-_TAG_SUFFIX = '.tag'
-
-_logger = logging.getLogger('node.downloads')
-
-
-class Pool(object):
-
- def __init__(self, root):
- self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop())
- if not exists(root):
- os.makedirs(root)
- self._root = root
-
- for tag_path in glob(join(root, '*.tag')):
- path, __ = splitext(tag_path)
- if exists(path):
- try:
- with file(tag_path) as f:
- key, tag = json.load(f)
- pool_key = json.dumps(key)
- self._pool[pool_key] = _Download(key, tag, path)
- continue
- except Exception:
- exception('Cannot open %r download, recreate', tag_path)
- os.unlink(path)
- os.unlink(tag_path)
-
- def get(self, key):
- key = json.dumps(key)
- if key in self._pool:
- return self._pool[key]
-
- def set(self, key, tag, fetcher, *args, **kwargs):
- pool_key = json.dumps(key)
- path = join(self._root, hashlib.md5(pool_key).hexdigest())
-
- def do_fetch():
- try:
- complete = fetcher(*args, path=path, **kwargs)
- except Exception:
- exception('Error while fetching %r', self)
- if exists(path):
- os.unlink(path)
- return True
- with file(path + _TAG_SUFFIX, 'w') as f:
- json.dump([key, tag], f)
- return complete
-
- job = coroutine.spawn(do_fetch)
- dl = self._pool[pool_key] = _Download(key, tag, path, job)
- return dl
-
- def remove(self, key):
- key = json.dumps(key)
- if key in self._pool:
- self._pool.peek(key).pop()
- del self._pool[key]
-
-
-class _Download(dict):
-
- def __init__(self, key, tag, path, job=None):
- self.tag = tag
- self._key = key
- self._path = path
- self._job = job
-
- def __repr__(self):
- return '<Download %r path=%r>' % (self._key, self._path)
-
- @property
- def ready(self):
- # pylint: disable-msg=E1101
- return self._job is None or self._job.dead
-
- @property
- def complete(self):
- return self._job is not None and self._job.value
-
- @property
- def length(self):
- if exists(self._path):
- return os.stat(self._path).st_size
-
- def open(self):
- if exists(self._path):
- return file(self._path, 'rb')
-
- def pop(self):
- if self._job is not None and not self._job.dead:
- _logger.debug('Abort fetching %r', self)
- self._job.kill()
-
- if exists(self._path):
- os.unlink(self._path)
- if exists(self._path + _TAG_SUFFIX):
- os.unlink(self._path + _TAG_SUFFIX)
-
- _logger.debug('Throw out %r from the pool', self)
diff --git a/sugar_network/node/files.py b/sugar_network/node/files.py
deleted file mode 100644
index 4fb64ca..0000000
--- a/sugar_network/node/files.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import json
-import logging
-from bisect import bisect_left
-from shutil import copyfileobj
-from os.path import join, exists, relpath, lexists, dirname
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine
-
-
-_logger = logging.getLogger('node.sync_files')
-
-
-def merge(files_path, packet):
- files_path = files_path.rstrip(os.sep)
- if not exists(files_path):
- os.makedirs(files_path)
- commit_seq = None
-
- for record in packet:
- op = record.get('op')
- if op == 'update':
- path = join(files_path, record['path'])
- if not exists(dirname(path)):
- os.makedirs(dirname(path))
- with toolkit.new_file(path) as f:
- copyfileobj(record['blob'], f)
- elif op == 'delete':
- path = join(files_path, record['path'])
- if lexists(path):
- os.unlink(path)
- elif op == 'commit':
- commit_seq = record['sequence']
-
- return commit_seq
-
-
-class Index(object):
-
- def __init__(self, files_path, index_path, seqno):
- self._files_path = files_path.rstrip(os.sep)
- self._index_path = index_path
- self._seqno = seqno
- self._index = []
- self._stamp = 0
- self._mutex = coroutine.Lock()
-
- if exists(self._index_path):
- with file(self._index_path) as f:
- self._index, self._stamp = json.load(f)
-
- if not exists(self._files_path):
- os.makedirs(self._files_path)
-
- def sync(self):
- with self._mutex:
- return self._sync()
-
- def diff(self, in_seq, out_seq=None, **kwargs):
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_initial_diff = not out_seq
-
- # Below calls will trigger coroutine switches, thius,
- # avoid changing `self._index` by different coroutines.
- with self._mutex:
- self._sync()
-
- _logger.debug('Start sync: in_seq=%r', in_seq)
-
- files = 0
- deleted = 0
- pos = 0
-
- try:
- for start, end in in_seq:
- pos = bisect_left(self._index, [start, None, None], pos)
- for pos, (seqno, path, mtime) in \
- enumerate(self._index[pos:]):
- if end is not None and seqno > end:
- break
- coroutine.dispatch()
- if mtime < 0:
- yield {'op': 'delete', 'path': path}
- deleted += 1
- else:
- blob_path = join(self._files_path, path)
- yield {'op': 'update',
- 'path': path,
- 'blob_size': os.stat(blob_path).st_size,
- 'blob': toolkit.iter_file(blob_path),
- }
- out_seq.include(start, seqno)
- start = seqno
- files += 1
- except StopIteration:
- pass
-
- if is_initial_diff:
- # There is only one diff, so, we can stretch it to remove holes
- out_seq.stretch()
- yield {'op': 'commit', 'sequence': out_seq}
-
- _logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r',
- in_seq, out_seq, files, deleted)
-
- def _sync(self):
- if os.stat(self._files_path).st_mtime <= self._stamp:
- return False
-
- new_files = set()
- updates = 0
- deletes = 0
-
- # Populate list of new files at first
- for root, __, files in os.walk(self._files_path):
- coroutine.dispatch()
- rel_root = relpath(root, self._files_path)
- if rel_root == '.':
- rel_root = ''
- else:
- rel_root += os.sep
- for filename in files:
- coroutine.dispatch()
- path = join(root, filename)
- if os.lstat(path).st_mtime > self._stamp:
- new_files.add(rel_root + filename)
-
- # Check for updates for already tracked files
- tail = []
- for pos, (__, rel_path, mtime) in enumerate(self._index[:]):
- coroutine.dispatch()
- path = join(self._files_path, rel_path)
- existing = lexists(path)
- if existing == (mtime >= 0) and \
- (not existing or os.lstat(path).st_mtime == mtime):
- continue
- if existing:
- new_files.discard(rel_path)
- pos -= len(tail)
- self._index = self._index[:pos] + self._index[pos + 1:]
- tail.append([
- self._seqno.next(),
- rel_path,
- int(os.lstat(path).st_mtime) if existing else -1,
- ])
- if existing:
- updates += 1
- else:
- deletes += 1
- self._index.extend(tail)
-
- _logger.debug('Updated %r index: new=%r updates=%r deletes=%r',
- self._files_path, len(self._files_path), updates, deletes)
-
- # Finally, add new files
- for rel_path in sorted(new_files):
- coroutine.dispatch()
- mtime = os.lstat(join(self._files_path, rel_path)).st_mtime
- self._index.append([self._seqno.next(), rel_path, mtime])
-
- self._stamp = os.stat(self._files_path).st_mtime
- if self._seqno.commit():
- with toolkit.new_file(self._index_path) as f:
- json.dump((self._index, self._stamp), f)
-
- return True
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index c7c22e0..61d32fb 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2013 Aleksey Lim
+# Copyright (C) 2013-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -13,17 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-import base64
import logging
-from Cookie import SimpleCookie
-from os.path import join
+from urlparse import urlsplit
-from sugar_network import node, toolkit
-from sugar_network.node import sync, stats_user, files, model, downloads, obs
+from sugar_network import toolkit
+from sugar_network.node import obs, master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit.coroutine import this
+from sugar_network.toolkit import http, parcel, pylru, ranges, enforce
RESOURCES = (
@@ -33,215 +31,107 @@ RESOURCES = (
'sugar_network.model.user',
)
-_ONE_WAY_DOCUMENTS = ['report']
-
_logger = logging.getLogger('node.master')
class MasterRoutes(NodeRoutes):
- def __init__(self, guid, volume, **kwargs):
- NodeRoutes.__init__(self, guid, volume=volume, **kwargs)
-
- self._pulls = {
- 'pull': lambda **kwargs:
- ('diff', None, model.diff(self.volume,
- ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)),
- 'files_pull': lambda **kwargs:
- ('files_diff', None, self._files.diff(**kwargs)),
- }
-
- self._pull_queue = downloads.Pool(
- join(toolkit.cachedir.value, 'pulls'))
- self._files = None
-
- if node.files_root.value:
- self._files = files.Index(node.files_root.value,
- join(volume.root, 'files.index'), volume.seqno)
-
- @route('POST', cmd='sync',
- acl=ACL.AUTH)
- def sync(self, request):
- reply, cookie = self._push(sync.decode(request.content_stream))
- exclude_seq = None
- if len(cookie.sent) == 1:
- exclude_seq = cookie.sent.values()[0]
- for op, layer, seq in cookie:
- reply.append(self._pulls[op](in_seq=seq,
- exclude_seq=exclude_seq, layer=layer))
- return sync.encode(reply, src=self.guid)
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, urlsplit(master_api.value).netloc, **kwargs)
+ self._pulls = pylru.lrucache(1024)
+
+ @route('POST', cmd='sync', arguments={'accept_length': int})
+ def sync(self, accept_length):
+ return parcel.encode(self._push() + (self._pull() or []),
+ limit=accept_length, header={'from': self.guid},
+ on_complete=this.cookie.clear)
@route('POST', cmd='push')
- def push(self, request, response):
- reply, cookie = self._push(sync.package_decode(request.content_stream))
- # Read passed cookie only after excluding `merged_seq`.
- # If there is `pull` out of currently pushed packet, excluding
- # `merged_seq` should not affect it.
- cookie.update(_Cookie(request))
- cookie.store(response)
- return sync.package_encode(reply, src=self.guid)
-
- @route('GET', cmd='pull',
- mime_type='application/octet-stream',
- arguments={'accept_length': int})
- def pull(self, request, response, accept_length=None):
- cookie = _Cookie(request)
- if not cookie:
- _logger.warning('Requested full dump in pull command')
- cookie.append(('pull', None, toolkit.Sequence([[1, None]])))
- cookie.append(('files_pull', None, toolkit.Sequence([[1, None]])))
-
- exclude_seq = None
- if len(cookie.sent) == 1:
- exclude_seq = toolkit.Sequence(cookie.sent.values()[0])
-
- reply = None
- for pull_key in cookie:
- op, layer, seq = pull_key
-
- pull = self._pull_queue.get(pull_key)
- if pull is not None:
- if not pull.ready:
- continue
- if not pull.tag:
- self._pull_queue.remove(pull_key)
- cookie.remove(pull_key)
- continue
- if accept_length is None or pull.length <= accept_length:
- _logger.debug('Found ready to use %r', pull)
- if pull.complete:
- cookie.remove(pull_key)
- else:
- seq.exclude(pull.tag)
- reply = pull.open()
- break
- _logger.debug('Existing %r is too big, will recreate', pull)
- self._pull_queue.remove(pull_key)
-
- out_seq = toolkit.Sequence()
- pull = self._pull_queue.set(pull_key, out_seq,
- sync.sneakernet_encode,
- [self._pulls[op](in_seq=seq, out_seq=out_seq,
- exclude_seq=exclude_seq, layer=layer,
- fetch_blobs=True)],
- limit=accept_length, src=self.guid)
- _logger.debug('Start new %r', pull)
+ def push(self):
+ return parcel.encode(self._push(), header={'from': self.guid})
+ @route('GET', cmd='pull', arguments={'accept_length': int})
+ def pull(self, accept_length):
+ reply = self._pull()
if reply is None:
- if cookie:
- _logger.debug('No ready pulls')
- # TODO Might be useful to set meaningful value here
- cookie.delay = node.pull_timeout.value
- else:
- _logger.debug('Nothing to pull')
-
- cookie.store(response)
- return reply
+ return None
+ return parcel.encode(reply, limit=accept_length,
+ header={'from': self.guid}, on_complete=this.cookie.clear)
@route('PUT', ['context', None], cmd='presolve',
acl=ACL.AUTH, mime_type='application/json')
def presolve(self, request):
- enforce(node.files_root.value, http.BadRequest, 'Disabled')
- aliases = self.volume['context'].get(request.guid)['aliases']
+ aliases = this.volume['context'].get(request.guid)['aliases']
enforce(aliases, http.BadRequest, 'Nothing to presolve')
- return obs.presolve(None, aliases, node.files_root.value)
+ return obs.presolve(None, aliases, this.volume.blobs.path('packages'))
def status(self):
result = NodeRoutes.status(self)
- result['level'] = 'master'
+ result['mode'] = 'master'
return result
- def _push(self, stream):
+ def _push(self):
+ cookie = this.cookie
reply = []
- cookie = _Cookie()
-
- for packet in stream:
- src = packet['src']
- enforce(packet['dst'] == self.guid, 'Misaddressed packet')
-
- if packet.name == 'pull':
- pull_seq = cookie['pull', packet['layer'] or None]
- pull_seq.include(packet['sequence'])
- cookie.sent.setdefault(src, toolkit.Sequence())
- elif packet.name == 'files_pull':
- if self._files is not None:
- cookie['files_pull'].include(packet['sequence'])
- elif packet.name == 'diff':
- seq, ack_seq = model.merge(self.volume, packet)
- reply.append(('ack', {
- 'ack': ack_seq,
- 'sequence': seq,
- 'dst': src,
- }, None))
- sent_seq = cookie.sent.setdefault(src, toolkit.Sequence())
- sent_seq.include(ack_seq)
- elif packet.name == 'stats_diff':
- reply.append(('stats_ack', {
- 'sequence': stats_user.merge(packet),
- 'dst': src,
- }, None))
-
- return reply, cookie
-
-
-class _Cookie(list):
-
- def __init__(self, request=None):
- list.__init__(self)
-
- self.sent = {}
- self.delay = 0
-
- if request is not None:
- self.update(self._get_cookie(request, 'sugar_network_pull') or [])
- self.sent = self._get_cookie(request, 'sugar_network_sent') or {}
-
- def __repr__(self):
- return '<Cookie pull=%s sent=%r>' % (list.__repr__(self), self.sent)
-
- def update(self, that):
- for op, layer, seq in that:
- self[op, layer].include(seq)
-
- def store(self, response):
- response.set('set-cookie', [])
- if self:
- _logger.debug('Postpone %r in cookie', self)
- self._set_cookie(response, 'sugar_network_pull',
- base64.b64encode(json.dumps(self)))
- self._set_cookie(response, 'sugar_network_sent',
- base64.b64encode(json.dumps(self.sent)))
- self._set_cookie(response, 'sugar_network_delay', self.delay)
+
+ for packet in parcel.decode(
+ this.request.content_stream, this.request.content_length):
+ sender = packet['from']
+ enforce(packet['to'] == self.guid, http.BadRequest,
+ 'Misaddressed packet')
+ if packet.name == 'push':
+ seqno, push_r = this.volume.patch(packet)
+ ack_r = [] if seqno is None else [[seqno, seqno]]
+ ack = {'ack': ack_r, 'ranges': push_r, 'to': sender}
+ reply.append(('ack', ack, None))
+ cookie.setdefault('ack', {}) \
+ .setdefault(sender, []) \
+ .append((push_r, ack_r))
+ elif packet.name == 'pull':
+ cookie.setdefault('ack', {}).setdefault(sender, [])
+ ranges.include(cookie.setdefault('pull', []), packet['ranges'])
+ elif packet.name == 'request':
+ cookie.setdefault('request', []).append(packet.header)
+
+ return reply
+
+ def _pull(self):
+ processed = this.cookie.get('id')
+ if processed in self._pulls:
+ cookie = this.cookie = self._pulls[processed]
+ if not cookie:
+ return None
else:
- self._unset_cookie(response, 'sugar_network_pull')
- self._unset_cookie(response, 'sugar_network_sent')
- self._unset_cookie(response, 'sugar_network_delay')
-
- def __getitem__(self, key):
- if not isinstance(key, tuple):
- key = (key, None)
- for op, layer, seq in self:
- if (op, layer) == key:
- return seq
- seq = toolkit.Sequence()
- self.append(key + (seq,))
- return seq
-
- def _get_cookie(self, request, name):
- cookie_str = request.environ.get('HTTP_COOKIE')
- if not cookie_str:
- return
- cookie = SimpleCookie()
- cookie.load(cookie_str)
- if name not in cookie:
- return
- value = cookie.get(name).value
- if value != 'unset_%s' % name:
- return json.loads(base64.b64decode(value))
-
- def _set_cookie(self, response, name, value, age=3600):
- cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age)
- response.get('set-cookie').append(cookie)
-
- def _unset_cookie(self, response, name):
- self._set_cookie(response, name, 'unset_%s' % name, 0)
+ cookie = this.cookie
+ cookie['id'] = toolkit.uuid()
+ self._pulls[cookie['id']] = cookie
+
+ pull_r = cookie.get('pull')
+ if not pull_r:
+ return []
+
+ reply = []
+ exclude = []
+ acks = cookie.get('ack')
+ if acks:
+ acked = {}
+ for req in cookie.get('request') or []:
+ ack_r = None
+ for push_r, ack_r in acks.get(req['origin']) or []:
+ if req['ranges'] == push_r:
+ break
+ else:
+ continue
+ ranges.include(acked.setdefault(req['from'], []), ack_r)
+ reply.append(('ack', {'to': req['from'], 'ack': ack_r}, []))
+ for node, ack_ranges in acks.items():
+ acked_r = acked.setdefault(node, [])
+ for __, i in ack_ranges:
+ ranges.include(acked_r, i)
+ r = reduce(lambda x, y: ranges.intersect(x, y), acked.values())
+ ranges.include(exclude, r)
+
+ push = this.volume.diff(pull_r, exclude, one_way=True, files=[''])
+ reply.append(('push', None, push))
+
+ return reply
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index 9ead01c..90e50c2 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -16,7 +16,7 @@
import bisect
import logging
-from sugar_network import db, toolkit
+from sugar_network import db
from sugar_network.model import Release, context as base_context
from sugar_network.node import obs
from sugar_network.toolkit.router import ACL
@@ -105,71 +105,6 @@ class Context(base_context.Context):
return value
-def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None,
- ignore_documents=None, **kwargs):
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_the_only_seq = not out_seq
- if layer:
- if isinstance(layer, basestring):
- layer = [layer]
- layer.append('common')
- try:
- for resource, directory in volume.items():
- if ignore_documents and resource in ignore_documents:
- continue
- coroutine.dispatch()
- directory.commit()
- yield {'resource': resource}
- for guid, patch in directory.diff(in_seq, exclude_seq,
- layer=layer if resource == 'context' else None):
- adiff = {}
- adiff_seq = toolkit.Sequence()
- for prop, meta, seqno in patch:
- adiff[prop] = meta
- adiff_seq.include(seqno, seqno)
- if adiff:
- yield {'guid': guid, 'diff': adiff}
- out_seq.include(adiff_seq)
- if is_the_only_seq:
- # There is only one diff, so, we can stretch it to remove all holes
- out_seq.stretch()
- except StopIteration:
- pass
-
- yield {'commit': out_seq}
-
-
-def merge(volume, records):
- directory = None
- commit_seq = toolkit.Sequence()
- merged_seq = toolkit.Sequence()
- synced = False
-
- for record in records:
- resource_ = record.get('resource')
- if resource_:
- directory = volume[resource_]
- continue
-
- if 'guid' in record:
- seqno, merged = directory.merge(**record)
- synced = synced or merged
- if seqno is not None:
- merged_seq.include(seqno, seqno)
- continue
-
- commit = record.get('commit')
- if commit is not None:
- commit_seq.include(commit)
- continue
-
- if synced:
- this.broadcast({'event': 'sync'})
-
- return commit_seq, merged_seq
-
-
def solve(volume, top_context, lsb_id=None, lsb_release=None,
stability=None, requires=None):
top_context = volume['context'][top_context]
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index 6d5d200..86e4ce1 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -18,14 +18,13 @@ import time
import logging
import hashlib
from ConfigParser import ConfigParser
-from os.path import join, isdir, exists
+from os.path import join, exists
-from sugar_network import db, node, toolkit
-from sugar_network.db import blobs
+from sugar_network import db, node
from sugar_network.model import FrontRoutes, load_bundle
-from sugar_network.node import stats_user, model
+from sugar_network.node import model
# pylint: disable-msg=W0611
-from sugar_network.toolkit.router import route, preroute, postroute, ACL
+from sugar_network.toolkit.router import route, preroute, postroute, ACL, File
from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute
from sugar_network.toolkit.spec import parse_requires, parse_version
from sugar_network.toolkit.bundle import Bundle
@@ -53,10 +52,6 @@ class NodeRoutes(db.Routes, FrontRoutes):
def guid(self):
return self._guid
- @route('GET', cmd='logon', acl=ACL.AUTH)
- def logon(self):
- pass
-
@route('GET', cmd='whoami', mime_type='application/json')
def whoami(self, request, response):
roles = []
@@ -66,7 +61,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
@route('GET', cmd='status', mime_type='application/json')
def status(self):
- return {'guid': self._guid,
+ return {'guid': self.guid,
'seqno': {
'db': self.volume.seqno.value,
'releases': self.volume.releases_seqno.value,
@@ -80,49 +75,39 @@ class NodeRoutes(db.Routes, FrontRoutes):
@fallbackroute('GET', ['packages'])
def route_packages(self, request, response):
- enforce(node.files_root.value, http.BadRequest, 'Disabled')
-
- if request.path and request.path[-1] == 'updates':
- root = join(node.files_root.value, *request.path[:-1])
- enforce(isdir(root), http.NotFound, 'Directory was not found')
+ path = this.request.path
+ if path and path[-1] == 'updates':
result = []
last_modified = 0
- for filename in os.listdir(root):
- if '.' in filename:
+ for blob in this.volume.blobs.diff(
+ [[this.request.if_modified_since + 1, None]],
+ join(*path[:-1]), recursive=False):
+ if '.' in blob.name:
continue
- path = join(root, filename)
- mtime = int(os.stat(path).st_mtime)
- if mtime > request.if_modified_since:
- result.append(filename)
- last_modified = max(last_modified, mtime)
+ result.append(blob.name)
+ last_modified = max(last_modified, blob.mtime)
response.content_type = 'application/json'
if last_modified:
response.last_modified = last_modified
return result
- path = join(node.files_root.value, *request.path)
- enforce(exists(path), http.NotFound, 'File was not found')
- if not isdir(path):
- return toolkit.iter_file(path)
-
- result = []
- for filename in os.listdir(path):
- if filename.endswith('.rpm') or filename.endswith('.deb'):
- continue
- result.append(filename)
-
- response.content_type = 'application/json'
- return result
+ blob = this.volume.blobs.get(join(*path))
+ if isinstance(blob, File):
+ return blob
+ else:
+ response.content_type = 'application/json'
+ return [i.name for i in blob if '.' not in i.name]
@route('POST', ['context'], cmd='submit',
arguments={'initial': False},
mime_type='application/json', acl=ACL.AUTH)
- def submit_release(self, request, initial):
- blob = blobs.post(request.content_stream, request.content_type)
+ def submit_release(self, initial):
+ blob = this.volume.blobs.post(
+ this.request.content_stream, this.request.content_type)
try:
context, release = load_bundle(blob, initial=initial)
except Exception:
- blobs.delete(blob.digest)
+ this.volume.blobs.delete(blob.digest)
raise
this.call(method='POST', path=['context', context, 'releases'],
content_type='application/json', content=release)
@@ -156,29 +141,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
arguments={'requires': list})
def get_clone(self, request, response):
solution = self.solve(request)
- return blobs.get(solution[request.guid]['blob'])
-
- @route('GET', ['user', None], cmd='stats-info',
- mime_type='application/json', acl=ACL.AUTH)
- def user_stats_info(self, request):
- status = {}
- for rdb in stats_user.get_rrd(request.guid):
- status[rdb.name] = rdb.last + stats_user.stats_user_step.value
-
- # TODO Process client configuration in more general manner
- return {'enable': True,
- 'step': stats_user.stats_user_step.value,
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'status': status,
- }
-
- @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH)
- def user_stats_upload(self, request):
- name = request.content['name']
- values = request.content['values']
- rrd = stats_user.get_rrd(request.guid)
- for timestamp, values in values:
- rrd[name].put(values, timestamp)
+ return this.volume.blobs.get(solution[request.guid]['blob'])
@preroute
def preroute(self, op, request, response):
@@ -204,7 +167,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
def on_create(self, request, props):
if request.resource == 'user':
- with file(blobs.get(props['pubkey']).path) as f:
+ with file(this.volume.blobs.get(props['pubkey']).path) as f:
props['guid'] = str(hashlib.sha1(f.read()).hexdigest())
db.Routes.on_create(self, request, props)
@@ -223,7 +186,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
from M2Crypto import RSA
pubkey = self.volume['user'][auth.login]['pubkey']
- key = RSA.load_pub_key(blobs.get(pubkey).path)
+ key = RSA.load_pub_key(this.volume.blobs.get(pubkey).path)
data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest()
enforce(key.verify(data, auth.signature.decode('hex')),
http.Forbidden, 'Bad credentials')
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 2d60ea8..333e6ea 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -21,13 +21,12 @@ from urlparse import urlsplit
from os.path import join, dirname, exists, isabs
from gettext import gettext as _
-from sugar_network import node, toolkit
-from sugar_network.client import api_url
-from sugar_network.node import sync, stats_user, files, model
+from sugar_network import toolkit
+from sugar_network.node import master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit import http, parcel, ranges, enforce
_logger = logging.getLogger('node.slave')
@@ -35,148 +34,94 @@ _logger = logging.getLogger('node.slave')
class SlaveRoutes(NodeRoutes):
- def __init__(self, key_path, volume_):
- self._auth = http.SugarAuth(key_path)
- NodeRoutes.__init__(self, self._auth.login, volume_)
-
- self._push_seq = toolkit.PersistentSequence(
- join(volume_.root, 'push.sequence'), [1, None])
- self._pull_seq = toolkit.PersistentSequence(
- join(volume_.root, 'pull.sequence'), [1, None])
- self._files_seq = toolkit.PersistentSequence(
- join(volume_.root, 'files.sequence'), [1, None])
- self._master_guid = urlsplit(api_url.value).netloc
- self._offline_session = None
-
- @route('POST', cmd='online-sync', acl=ACL.LOCAL)
+ def __init__(self, volume, **kwargs):
+ self._creds = http.SugarAuth(
+ join(volume.root, 'etc', 'private', 'node'))
+ NodeRoutes.__init__(self, self._creds.login, volume=volume, **kwargs)
+ vardir = join(volume.root, 'var')
+ self._push_r = toolkit.Bin(join(vardir, 'push.ranges'), [[1, None]])
+ self._pull_r = toolkit.Bin(join(vardir, 'pull.ranges'), [[1, None]])
+ self._master_guid = urlsplit(master_api.value).netloc
+
+ @route('POST', cmd='online_sync', acl=ACL.LOCAL,
+ arguments={'no_pull': bool})
def online_sync(self, no_pull=False):
- conn = http.Connection(api_url.value, auth=self._auth)
-
- # TODO `http.Connection` should handle re-POSTing without
- # loosing payload after authentication
- conn.get(cmd='logon')
-
- push = [('diff', None, model.diff(self.volume, self._push_seq))]
- if not no_pull:
- push.extend([
- ('pull', {
- 'sequence': self._pull_seq,
- 'layer': node.sync_layers.value,
- }, None),
- ('files_pull', {'sequence': self._files_seq}, None),
- ])
- if stats_user.stats_user.value:
- push.append(('stats_diff', None, stats_user.diff()))
+ self._export(not no_pull)
+ conn = http.Connection(master_api.value)
response = conn.request('POST',
- data=sync.encode(push, src=self.guid, dst=self._master_guid),
+ data=parcel.encode(self._export(not no_pull), header={
+ 'from': self.guid,
+ 'to': self._master_guid,
+ }),
params={'cmd': 'sync'},
headers={'Transfer-Encoding': 'chunked'})
- self._import(sync.decode(response.raw), None)
+ self._import(parcel.decode(response.raw))
- @route('POST', cmd='offline-sync', acl=ACL.LOCAL)
+ @route('POST', cmd='offline_sync', acl=ACL.LOCAL)
def offline_sync(self, path):
- enforce(node.sync_layers.value,
- '--sync-layers is not specified, the full master dump '
- 'might be too big and should be limited')
- enforce(isabs(path), 'Argument \'path\' should be an absolute path')
-
- _logger.debug('Start %r synchronization session in %r',
- self._offline_session, path)
+ enforce(isabs(path), "Argument 'path' is not an absolute path")
+ _logger.debug('Start offline synchronization in %r', path)
if not exists(path):
os.makedirs(path)
- try:
- self._offline_session = self._offline_sync(path,
- **(self._offline_session or {}))
- except Exception:
- toolkit.exception(_logger, 'Failed to complete synchronization')
- self._offline_session = None
- raise
-
- if self._offline_session is None:
- _logger.debug('Synchronization completed')
- else:
- _logger.debug('Postpone synchronization with %r session',
- self._offline_session)
-
- def status(self):
- result = NodeRoutes.status(self)
- result['level'] = 'slave'
- return result
-
- def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None):
- push = []
-
- if push_seq is None:
- push_seq = toolkit.Sequence(self._push_seq)
- if stats_seq is None:
- stats_seq = {}
- if session is None:
- session = toolkit.uuid()
- push.append(('pull', {
- 'sequence': self._pull_seq,
- 'layer': node.sync_layers.value,
- }, None))
- push.append(('files_pull', {'sequence': self._files_seq}, None))
-
this.broadcast({
'event': 'sync_progress',
'progress': _('Reading sneakernet packages'),
})
- self._import(sync.sneakernet_decode(path), push_seq)
-
- offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
- if exists(offline_script):
- shutil.copy(offline_script, path)
+ requests = self._import(parcel.decode_dir(path))
this.broadcast({
'event': 'sync_progress',
'progress': _('Generating new sneakernet package'),
})
+ offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
+ if exists(offline_script):
+ shutil.copy(offline_script, path)
+ parcel.encode_dir(requests + self._export(True), root=path, header={
+ 'from': self.guid,
+ 'to': self._master_guid,
+ })
+
+ _logger.debug('Synchronization completed')
+
+ def status(self):
+ result = NodeRoutes.status(self)
+ result['mode'] = 'slave'
+ return result
+
+ def _import(self, package):
+ requests = []
- diff_seq = toolkit.Sequence([])
- push.append(('diff', None,
- model.diff(self.volume, push_seq, diff_seq)))
- if stats_user.stats_user.value:
- push.append(('stats_diff', None, stats_user.diff(stats_seq)))
- complete = sync.sneakernet_encode(push, root=path,
- src=self.guid, dst=self._master_guid, api_url=api_url.value,
- session=session)
- if not complete:
- push_seq.exclude(diff_seq)
- return {'push_seq': push_seq,
- 'stats_seq': stats_seq,
- 'session': session,
- }
-
- def _import(self, package, push_seq):
for packet in package:
- from_master = (packet['src'] == self._master_guid)
- addressed = (packet['dst'] == self.guid)
-
- if packet.name == 'diff':
- _logger.debug('Processing %r', packet)
- seq, __ = model.merge(self.volume, packet, shift_seqno=False)
- if from_master and seq:
- self._pull_seq.exclude(seq)
- self._pull_seq.commit()
-
- elif from_master:
- if packet.name == 'ack' and addressed:
- _logger.debug('Processing %r', packet)
- if push_seq:
- push_seq.exclude(packet['sequence'])
- self._pull_seq.exclude(packet['ack'])
- self._pull_seq.commit()
- self._push_seq.exclude(packet['sequence'])
- self._push_seq.commit()
- elif packet.name == 'stats_ack' and addressed:
- _logger.debug('Processing %r', packet)
- stats_user.commit(packet['sequence'])
- elif packet.name == 'files_diff':
- _logger.debug('Processing %r', packet)
- seq = files.merge(node.files_root.value, packet)
- if seq:
- self._files_seq.exclude(seq)
- self._files_seq.commit()
+ sender = packet['from']
+ from_master = (sender == self._master_guid)
+ if packet.name == 'push':
+ seqno, committed = this.volume.patch(packet)
+ if seqno is not None:
+ if from_master:
+ ranges.exclude(self._pull_r.value, committed)
+ self._pull_r.commit()
+ else:
+ requests.append(('request', {
+ 'origin': sender,
+ 'ranges': committed,
+ }, []))
+ ranges.exclude(self._push_r.value, seqno, seqno)
+ self._push_r.commit()
+ elif packet.name == 'ack' and from_master and \
+ packet['to'] == self.guid:
+ ranges.exclude(self._pull_r.value, packet['ack'])
+ self._pull_r.commit()
+ if packet['ranges']:
+ ranges.exclude(self._push_r.value, packet['ranges'])
+ self._push_r.commit()
+
+ return requests
+
+ def _export(self, pull):
+ export = []
+ if pull:
+ export.append(('pull', {'ranges': self._pull_r.value}, None))
+ export.append(('push', None, self.volume.diff(self._push_r.value)))
+ return export
diff --git a/sugar_network/node/stats_user.py b/sugar_network/node/stats_user.py
deleted file mode 100644
index 0b96449..0000000
--- a/sugar_network/node/stats_user.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import logging
-from os.path import join, exists, isdir
-
-from sugar_network import node, toolkit
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.toolkit import Option, pylru, enforce
-
-
-stats_user = Option(
- 'accept personalized users statistics',
- default=False, type_cast=Option.bool_cast, action='store_true')
-
-stats_user_step = Option(
- 'step interval in seconds for users\' RRD databases',
- default=60, type_cast=int)
-
-stats_user_rras = Option(
- 'comma separated list of RRAs for users\' RRD databases',
- default=[
- 'RRA:AVERAGE:0.5:1:4320', # one day with 60s step
- 'RRA:AVERAGE:0.5:5:2016', # one week with 5min step
- ],
- type_cast=Option.list_cast, type_repr=Option.list_repr)
-
-_logger = logging.getLogger('node.stats_user')
-_user_cache = pylru.lrucache(32)
-
-
-def get_rrd(user):
- if user in _user_cache:
- return _user_cache[user]
- else:
- rrd = _user_cache[user] = Rrd(_rrd_path(user),
- stats_user_step.value, stats_user_rras.value)
- return rrd
-
-
-def diff(in_info=None):
- if in_info is None:
- in_info = {}
- out_info = {}
-
- try:
- for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')):
- in_info.setdefault(user, {})
- out_info.setdefault(user, {})
-
- for db in rrd:
- yield {'db': db.name, 'user': user}
-
- in_seq = in_info[user].get(db.name)
- if in_seq is None:
- in_seq = toolkit.PersistentSequence(
- join(rrd.root, db.name + '.push'), [1, None])
- in_info[user][db.name] = in_seq
- elif in_seq is not toolkit.Sequence:
- in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq)
- out_seq = out_info[user].setdefault(db.name,
- toolkit.Sequence())
-
- for start, end in in_seq:
- for timestamp, values in \
- db.get(max(start, db.first), end or db.last):
- yield {'timestamp': timestamp, 'values': values}
- in_seq.exclude(start, timestamp)
- out_seq.include(start, timestamp)
- start = timestamp
- except StopIteration:
- pass
-
- yield {'commit': out_info}
-
-
-def merge(packet):
- db = None
- seq = None
-
- for record in packet:
- if 'db' in record:
- db = get_rrd(record['user'])[record['db']]
- elif 'commit' in record:
- seq = record['commit']
- else:
- enforce(db is not None, 'Malformed user stats diff')
- db.put(record['values'], record['timestamp'])
-
- return seq
-
-
-def commit(info):
- for user, dbs in info.items():
- for db_name, merged in dbs.items():
- seq = toolkit.PersistentSequence(
- _rrd_path(user, db_name + '.push'), [1, None])
- seq.exclude(merged)
- seq.commit()
-
-
-def _walk_rrd(root):
- if not exists(root):
- return
- for users_dirname in os.listdir(root):
- users_dir = join(root, users_dirname)
- if not isdir(users_dir):
- continue
- for user in os.listdir(users_dir):
- yield user, Rrd(join(users_dir, user), stats_user_step.value)
-
-
-def _rrd_path(user, *args):
- return join(node.stats_root.value, 'user', user[:2], user, *args)
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index d3d9b88..89a9d0f 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -523,7 +523,7 @@ class Bin(object):
json.dump(self.value, f)
f.flush()
os.fsync(f.fileno())
- self._orig_value = self.value
+ self._orig_value = deepcopy(self.value)
return True
def __enter__(self):
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index d280035..254e6f3 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -112,8 +112,8 @@ class Connection(object):
_Session = None
- def __init__(self, api_url='', auth=None, max_retries=0, **session_args):
- self.api_url = api_url
+ def __init__(self, api='', auth=None, max_retries=0, **session_args):
+ self.api = api
self.auth = auth
self._max_retries = max_retries
self._session_args = session_args
@@ -121,7 +121,7 @@ class Connection(object):
self._nonce = None
def __repr__(self):
- return '<Connection api_url=%s>' % self.api_url
+ return '<Connection api=%s>' % self.api
def __enter__(self):
return self
@@ -203,7 +203,7 @@ class Connection(object):
if not path:
path = ['']
if not isinstance(path, basestring):
- path = '/'.join([i.strip('/') for i in [self.api_url] + path])
+ path = '/'.join([i.strip('/') for i in [self.api] + path])
try_ = 0
while True:
@@ -283,7 +283,7 @@ class Connection(object):
break
path = reply.headers['location']
if path.startswith('/'):
- path = self.api_url + path
+ path = self.api + path
if request.method != 'HEAD':
if reply.headers.get('Content-Type') == 'application/json':
@@ -380,7 +380,7 @@ class SugarAuth(object):
key_dir = dirname(self._key_path)
if exists(self._key_path):
- if os.stat(key_dir) & 077:
+ if os.stat(key_dir).st_mode & 077:
os.chmod(key_dir, 0700)
self._key = RSA.load_key(self._key_path)
return
@@ -432,7 +432,7 @@ class _Subscription(object):
if try_ == 0:
raise
toolkit.exception('Failed to read from %r subscription, '
- 'will resubscribe', self._client.api_url)
+ 'will resubscribe', self._client.api)
self._content = None
return _parse_event(line)
@@ -441,7 +441,7 @@ class _Subscription(object):
return self._content
params.update(self._condition)
params['cmd'] = 'subscribe'
- _logger.debug('Subscribe to %r, %r', self._client.api_url, params)
+ _logger.debug('Subscribe to %r, %r', self._client.api, params)
response = self._client.request('GET', params=params)
self._content = response.raw
return self._content
diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py
deleted file mode 100644
index bafe6d1..0000000
--- a/sugar_network/toolkit/rrd.py
+++ /dev/null
@@ -1,296 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""Convenient access to RRD databases."""
-
-import re
-import os
-import time
-import json
-import bisect
-import logging
-from os.path import exists, join, splitext
-
-
-_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$')
-_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$')
-
-_FETCH_PAGE = 256
-
-_logger = logging.getLogger('rrd')
-_rrdtool = None
-
-
-class Rrd(object):
-
- def __init__(self, root, step, rras=None):
- global _rrdtool
-
- import rrdtool
- _rrdtool = rrdtool
-
- self._root = root
- self._step = step
- # rrdtool knows nothing about `unicode`
- self._rras = [i.encode('utf8') for i in rras or []]
- self._dbsets = {}
-
- if not exists(self._root):
- os.makedirs(self._root)
-
- for filename in os.listdir(self._root):
- match = _DB_FILENAME_RE.match(filename)
- if match is not None:
- name, revision = match.groups()
- self.get(name).load(filename, int(revision or 0))
-
- def __iter__(self):
- for i in self._dbsets.values():
- yield i
-
- def __getitem__(self, name):
- return self.get(name)
-
- def __contains__(self, name):
- return name in self._dbsets
-
- @property
- def root(self):
- return self._root
-
- @property
- def step(self):
- return self._step
-
- def get(self, name):
- db = self._dbsets.get(name)
- if db is None:
- db = _DbSet(self._root, name, self._step, self._rras)
- self._dbsets[name] = db
- return db
-
-
-class _DbSet(object):
-
- def __init__(self, root, name, step, rras):
- self._root = root
- self.name = name
- self._step = step
- self._rras = rras
- self._revisions = []
- self._fields = None
- self._field_names = None
- self.__db = None
-
- @property
- def fields(self):
- return self._field_names
-
- @fields.setter
- def fields(self, fields):
- self._field_names = fields.keys()
- self._field_names.sort()
- self._fields = [str(fields[i]) for i in self._field_names]
- _logger.debug('Set %r fields for %r', self._fields, self.name)
-
- @property
- def first(self):
- if not self._revisions:
- return
- return self._revisions[0].first
-
- @property
- def last(self):
- if not self._revisions:
- return
- return self._revisions[-1].last
-
- @property
- def last_ds(self):
- if not self._revisions or not self._field_names:
- return {}
- info = _rrdtool.info(self._revisions[-1].path)
- result = {}
- for field in self._field_names:
- result[field] = float(info.get('ds[%s].last_ds' % field) or 0)
- return result
-
- def load(self, filename, revision):
- _logger.debug('Load %s database from %s with revision %s',
- filename, self._root, revision)
- db = _Db(join(self._root, filename), revision)
- bisect.insort(self._revisions, db)
- return db
-
- def put(self, values, timestamp=None):
- if not self.fields:
- _logger.debug('Parse fields from the first put')
- self.fields = dict([
- (i, 'DS:%s:GAUGE:%s:U:U' % (i, self._step * 2))
- for i in values])
-
- if not timestamp:
- timestamp = int(time.time())
- timestamp = timestamp / self._step * self._step
-
- db = self._get_db(timestamp)
- if db is None:
- return
-
- if timestamp <= db.last:
- _logger.warning('Database %s updated at %s, %s in the past',
- db.path, db.last, timestamp)
- return
-
- value = [str(timestamp)]
- for name in self._field_names:
- value.append(str(values[name]))
-
- _logger.debug('Put %r to %s', value, db.path)
-
- db.put(':'.join(value), timestamp)
-
- def get(self, start=None, end=None, resolution=None):
- if not self._revisions:
- return
-
- if not resolution:
- resolution = self._step
-
- if start is None:
- start = self._revisions[0].first
- if end is None:
- end = self._revisions[-1].last
-
- revisions = []
- for db in reversed(self._revisions):
- revisions.append(db)
- if db.last <= start:
- break
-
- start = start - start % self._step - self._step
- last = min(end, start + _FETCH_PAGE * resolution)
- last -= last % self._step + self._step
-
- for db in reversed(revisions):
- db_end = min(last, db.last - self._step)
- if start > db_end:
- break
- (row_start, start, row_step), __, rows = _rrdtool.fetch(
- str(db.path),
- 'AVERAGE',
- '--start', str(start),
- '--end', str(db_end),
- '--resolution', str(resolution))
- for raw_row in rows:
- row_start += row_step
- if row_start > end:
- break
- row = {}
- for i, value in enumerate(raw_row):
- row[db.field_names[i]] = value or .0
- yield row_start, row
- start = db_end + 1
-
- def _get_db(self, timestamp):
- if self.__db is None and self._fields:
- if self._revisions:
- db = self._revisions[-1]
- if db.last >= timestamp:
- _logger.warning(
- 'Database %s updated at %s, %s in the past',
- db.path, db.last, timestamp)
- return None
- if db.step != self._step or db.rras != self._rras or \
- db.field_names != self._field_names:
- db = self._create_db(db.revision + 1, db.last)
- else:
- db = self._create_db(0, timestamp)
- self.__db = db
- return self.__db
-
- def _create_db(self, revision, timestamp):
- filename = self.name
- if revision:
- filename += '-%s' % revision
- filename += '.rrd'
-
- _logger.debug('Create %s database in %s start=%s step=%s',
- filename, self._root, timestamp, self._step)
-
- _rrdtool.create(
- str(join(self._root, filename)),
- '--start', str(timestamp - self._step),
- '--step', str(self._step),
- *(self._fields + self._rras))
-
- return self.load(filename, revision)
-
-
-class _Db(object):
-
- def __init__(self, path, revision=0):
- self.path = str(path)
- self._meta_path = splitext(path)[0] + '.meta'
- self.revision = revision
- self.fields = []
- self.field_names = []
- self.rras = []
-
- info = _rrdtool.info(self.path)
- self.step = info['step']
- self.first = 0
- self.last = info['last_update']
-
- fields = {}
- rras = {}
-
- for key, value in info.items():
- match = _INFO_RE.match(key)
- if match is None:
- continue
- prefix, key, prop = match.groups()
- if prefix == 'ds':
- fields.setdefault(key, {})
- fields[key][prop] = value
- if prefix == 'rra':
- rras.setdefault(key, {})
- rras[key][prop] = value
-
- for index in sorted([int(i) for i in rras.keys()]):
- rra = rras[str(index)]
- self.rras.append(
- 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra)
-
- for name in sorted(fields.keys()):
- props = fields[name]
- props['name'] = name
- self.fields.append(props)
- self.field_names.append(name)
-
- if exists(self._meta_path):
- with file(self._meta_path) as f:
- self.first = json.load(f).get('first')
-
- def put(self, value, timestamp):
- if not self.first:
- with file(self._meta_path, 'w') as f:
- json.dump({'first': timestamp}, f)
- self.first = timestamp
- _rrdtool.update(self.path, str(value))
- self.last = _rrdtool.info(self.path)['last_update']
-
- def __cmp__(self, other):
- return cmp(self.revision, other.revision)
diff --git a/tests/__init__.py b/tests/__init__.py
index cc9ec01..dc10cdb 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -27,7 +27,7 @@ from sugar_network.model.user import User
from sugar_network.model.context import Context
from sugar_network.model.post import Post
from sugar_network.node.master import MasterRoutes
-from sugar_network.node import stats_user, obs, slave
+from sugar_network.node import obs, slave
from requests import adapters
@@ -87,11 +87,11 @@ class Test(unittest.TestCase):
db.index_flush_threshold.value = 1
node.find_limit.value = 1024
node.data_root.value = tmpdir
- node.stats_root.value = tmpdir + '/stats'
node.port.value = 8888
+ node.master_api.value = 'http://127.0.0.1:7777'
db.index_write_queue.value = 10
client.local_root.value = tmpdir
- client.api_url.value = 'http://127.0.0.1:8888'
+ client.api.value = 'http://127.0.0.1:7777'
client.mounts_root.value = None
client.ipc_port.value = 5555
client.layers.value = None
@@ -103,9 +103,6 @@ class Test(unittest.TestCase):
mountpoints._connects.clear()
mountpoints._found.clear()
mountpoints._COMPLETE_MOUNT_TIMEOUT = .1
- stats_user.stats_user.value = False
- stats_user.stats_user_step.value = 1
- stats_user._user_cache.clear()
obs._conn = None
obs._repos = {'base': [], 'presolve': []}
http._RECONNECTION_NUMBER = 0
@@ -265,10 +262,11 @@ class Test(unittest.TestCase):
def start_master(self, classes=None, routes=MasterRoutes):
if classes is None:
classes = [User, Context, Post]
+ #self.touch(('master/etc/private/node', file(join(root, 'data', NODE_UID)).read()))
self.node_volume = db.Volume('master', classes)
- self.node_routes = routes('master', volume=self.node_volume)
+ self.node_routes = routes(volume=self.node_volume)
self.node_router = Router(self.node_routes)
- self.node = coroutine.WSGIServer(('127.0.0.1', 8888), self.node_router)
+ self.node = coroutine.WSGIServer(('127.0.0.1', 7777), self.node_router)
coroutine.spawn(self.node.serve_forever)
coroutine.dispatch(.1)
this.volume = self.node_volume
@@ -281,8 +279,7 @@ class Test(unittest.TestCase):
def node():
volume = db.Volume('master', classes)
- self.node_routes = routes('guid', volume)
- node = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.node_routes))
+ node = coroutine.WSGIServer(('127.0.0.1', 7777), Router(routes(volume=volume)))
node.serve_forever()
pid = self.fork(node)
@@ -293,7 +290,7 @@ class Test(unittest.TestCase):
if classes is None:
classes = [User, Context]
volume = db.Volume('client', classes)
- self.client_routes = routes(volume, client.api_url.value)
+ self.client_routes = routes(volume, client.api.value)
self.client = coroutine.WSGIServer(
('127.0.0.1', client.ipc_port.value), Router(self.client_routes))
coroutine.spawn(self.client.serve_forever)
@@ -306,7 +303,7 @@ class Test(unittest.TestCase):
classes = [User, Context]
self.start_master(classes)
volume = db.Volume('client', classes)
- self.client_routes = ClientRoutes(volume, client.api_url.value)
+ self.client_routes = ClientRoutes(volume, client.api.value)
self.wait_for_events(self.client_routes, event='inline', state='online').wait()
self.client = coroutine.WSGIServer(
('127.0.0.1', client.ipc_port.value), Router(self.client_routes))
@@ -324,33 +321,6 @@ class Test(unittest.TestCase):
this.volume = self.home_volume
return IPCConnection()
- def restful_server(self, classes=None):
- if not exists('remote'):
- os.makedirs('remote')
-
- logfile = file('remote/log', 'a')
- sys.stdout = sys.stderr = logfile
-
- for handler in logging.getLogger().handlers:
- logging.getLogger().removeHandler(handler)
- logging.basicConfig(level=logging.DEBUG)
-
- db.index_flush_timeout.value = 0
- db.index_flush_threshold.value = 1
- node.find_limit.value = 1024
- db.index_write_queue.value = 10
-
- volume = db.Volume('remote', classes or [User, Context])
- self.node_routes = MasterRoutes('guid', volume)
- httpd = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.node_routes))
- try:
- coroutine.joinall([
- coroutine.spawn(httpd.serve_forever),
- ])
- finally:
- httpd.stop()
- volume.close()
-
def wait_for_events(self, cp, **condition):
trigger = coroutine.AsyncResult()
@@ -412,3 +382,5 @@ oLqnwHwnk4DFkdO7ZwIDAQAB
-----END PUBLIC KEY-----
"""
UID2 = 'd820a3405d6aadf2cf207f6817db2a79f8fa07aa'
+
+NODE_UID = 'c41529f1d629e60bdc21434011133f2c8f65f643'
diff --git a/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643 b/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643
new file mode 100644
index 0000000..f76506d
--- /dev/null
+++ b/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQCv0awK/tGfUHaxQfQ5dB/he8ODgGClqZz1toFNLkUm5qVmWA97
+yszv+EVpDLLm+XtMwNhMBUIH3JcCrNiXOJJBW+Aj5pQ3Rby2RBZTJxasiCujNGq3
+8QKOmCLCp0tqQ2VyXtlNn+azg7Lz05RdzrprnDrwu/w7Q3UmY0s92G6sfQIDAQAB
+AoGBAKYxfvvhxUpf5+JEYtQQRbaBo91g83qE6t6ExpKrQyizavNkGDa/C5tmRk43
+d8DHYNq7i3nImpMN3BzmP9Ip4mydORw1hGn79P20IeTozTeeBDifDMqxlopeVxeg
+7VbYPyYJAU5Gz5dC3JXdf0qeBfPB9Rb5E3E8S+MPl3pFncTtAkEA5k3o1ja2lhCv
+pe2/TrL8YEEhYY2KsB3DiL29CMuQwGePAd1GG5uSlZWhGC5zAbekmQKHa8oMHC7h
+p0ZjtqYZKwJBAMNvhUiwDd+TQejQAtXbD1dk62TPvTnQW8dD558qNLlz/ernNpBj
+v9Y3oq+bjkDzvpheY4K5xrg9W4TJancLLPcCQHjeLLO4FU1exoCD7SJFh3SQ2g8T
+tNTHWia6xaoHBBomf4RP+ApnNKAy3lANmKgvFECFdkMY0BA+folGxPBH7e8CQQCF
+FKSq+Y+I5gqkkTjNDW1l8ofETx2oh7RnfVr07FWYz15hne5u5i3Unm/+qqt0mUX5
+FZUniH/EJ6vxQQJpa8fDAkB6bV2mTrvx/Atk3ADIVWfI5PozZt9NoJKrMuWhoCvt
+kp5Rs9gtr/hqo/mPLCTy3r+v+suAji/VS/OKApeRhmCz
+-----END RSA PRIVATE KEY-----
diff --git a/tests/integration/master_personal.py b/tests/integration/master_personal.py
index ec87942..7bfa6aa 100755
--- a/tests/integration/master_personal.py
+++ b/tests/integration/master_personal.py
@@ -38,7 +38,7 @@ class MasterPersonalTest(tests.Test):
'--obs-url=',
])
self.client_pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start',
- '--api-url=http://127.0.0.1:8100', '--cachedir=client/tmp',
+ '--api=http://127.0.0.1:8100', '--cachedir=client/tmp',
'-DDD', '--rundir=client/run', '--server-mode', '--layers=pilot',
'--local-root=client',
'--port=8101', '--index-flush-threshold=1',
diff --git a/tests/integration/master_slave.py b/tests/integration/master_slave.py
index 0bf9f4f..c9981df 100755
--- a/tests/integration/master_slave.py
+++ b/tests/integration/master_slave.py
@@ -38,7 +38,7 @@ class MasterSlaveTest(tests.Test):
'--obs-url=',
])
self.slave_pid = self.popen([join(src_root, 'sugar-network-node'), '-F', 'start',
- '--api-url=http://127.0.0.1:8100',
+ '--api=http://127.0.0.1:8100',
'--port=8101', '--data-root=slave/db', '--cachedir=slave/tmp',
'-DDD', '--rundir=slave/run', '--files-root=slave/files',
'--stats-root=slave/stats', '--stats-user', '--stats-user-step=1',
diff --git a/tests/integration/node_client.py b/tests/integration/node_client.py
index 64fe97c..5dac3dc 100755
--- a/tests/integration/node_client.py
+++ b/tests/integration/node_client.py
@@ -134,14 +134,14 @@ class NodeClientTest(tests.Test):
self.assertEqual(['clone'], json.load(file('client/db/context/co/context/layer'))['value'])
def cli(self, cmd, stdin=None):
- cmd = ['sugar-network', '--local-root=client', '--ipc-port=5101', '--api-url=http://127.0.0.1:8100', '-DDD'] + cmd
+ cmd = ['sugar-network', '--local-root=client', '--ipc-port=5101', '--api=http://127.0.0.1:8100', '-DDD'] + cmd
if '--anonymous' not in cmd and not self.client_pid:
self.client_pid = self.popen([join(src_root, 'sugar-network-client'),
'-DDDF', 'start',
'--local-root=client',
'--mounts-root=mnt', '--cachedir=tmp', '--ipc-port=5101',
- '--api-url=http://127.0.0.1:8100',
+ '--api=http://127.0.0.1:8100',
])
coroutine.sleep(2)
ipc = Connection('http://127.0.0.1:5101')
diff --git a/tests/integration/node_packages.py b/tests/integration/node_packages.py
index 91b6f16..176fbc1 100755
--- a/tests/integration/node_packages.py
+++ b/tests/integration/node_packages.py
@@ -98,7 +98,7 @@ class NodePackagesSlaveTest(tests.Test):
urllib2.urlopen('http://127.0.0.1:8100/packages/presolve/OLPC-11.3.1/rpm').read())
pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start',
- '--api-url=http://127.0.0.1:8100', '--cachedir=master.client/tmp',
+ '--api=http://127.0.0.1:8100', '--cachedir=master.client/tmp',
'-DDD', '--rundir=master.client/run', '--layers=pilot',
'--local-root=master.client',
'--index-flush-threshold=1', '--ipc-port=8200',
@@ -116,7 +116,7 @@ class NodePackagesSlaveTest(tests.Test):
# From slave
self.pids.append(self.popen([join(src_root, 'sugar-network-node'), '-F', 'start',
- '--api-url=http://127.0.0.1:8100',
+ '--api=http://127.0.0.1:8100',
'--port=8101', '--data-root=slave/db', '--cachedir=slave/tmp',
'-DDD', '--rundir=slave/run', '--files-root=slave/files',
'--stats-root=slave/stats', '--stats-user', '--stats-user-step=1',
@@ -136,7 +136,7 @@ class NodePackagesSlaveTest(tests.Test):
urllib2.urlopen('http://127.0.0.1:8101/packages/presolve/OLPC-11.3.1/rpm').read())
pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start',
- '--api-url=http://127.0.0.1:8101', '--cachedir=master.client/tmp',
+ '--api=http://127.0.0.1:8101', '--cachedir=master.client/tmp',
'-DDD', '--rundir=master.client/run', '--layers=pilot',
'--local-root=master.client',
'--index-flush-threshold=1', '--ipc-port=8200',
@@ -155,7 +155,7 @@ class NodePackagesSlaveTest(tests.Test):
os.makedirs('client/mnt/disk/sugar-network')
self.pids.append(self.popen([join(src_root, 'sugar-network-client'), '-F', 'start',
- '--api-url=http://127.0.0.1:8100', '--cachedir=client/tmp',
+ '--api=http://127.0.0.1:8100', '--cachedir=client/tmp',
'-DDD', '--rundir=client/run', '--server-mode', '--layers=pilot',
'--local-root=client',
'--port=8102', '--index-flush-threshold=1',
diff --git a/tests/regression/api.py b/tests/regression/api.py
index ce89952..05bf21c 100755
--- a/tests/regression/api.py
+++ b/tests/regression/api.py
@@ -101,7 +101,7 @@ class Api(tests.Test):
self.client_pid = self.popen([join(PROD_ROOT, 'sugar-network-client'),
'-DDDF', 'start',
'--local-root=client', '--mounts-root=mnt', '--cachedir=tmp',
- '--ipc-port=%s' % client.ipc_port.value, '--api-url=%s' % client.api_url.value,
+ '--ipc-port=%s' % client.ipc_port.value, '--api=%s' % client.api.value,
])
def tearDown(self):
diff --git a/tests/units/client/offline_routes.py b/tests/units/client/offline_routes.py
index b14ccd6..c8ac58c 100755
--- a/tests/units/client/offline_routes.py
+++ b/tests/units/client/offline_routes.py
@@ -340,7 +340,7 @@ class OfflineRoutes(tests.Test):
}]
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
self.node.stop()
@@ -355,7 +355,7 @@ class OfflineRoutes(tests.Test):
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
def test_ServiceUnavailableWhileSolving(self):
diff --git a/tests/units/client/online_routes.py b/tests/units/client/online_routes.py
index cf14834..50df2ec 100755
--- a/tests/units/client/online_routes.py
+++ b/tests/units/client/online_routes.py
@@ -591,7 +591,7 @@ Can't find all required implementations:
self.assertEqual('content', file(blob_path).read())
assert exists(clone_path + '/data.blob')
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/%s/%s' % (context[:2], context))))
self.assertEqual([
@@ -628,7 +628,7 @@ Can't find all required implementations:
self.assertEqual('content', file(blob_path).read())
assert not lexists(clone_path)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/%s/%s' % (context[:2], context))))
self.assertEqual([
@@ -647,7 +647,7 @@ Can't find all required implementations:
sorted(ipc.get(['context'], reply='layer')['result']))
assert exists(clone_path + '/data.blob')
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/%s/%s' % (context[:2], context))))
def test_clone_Activity(self):
@@ -751,7 +751,7 @@ Can't find all required implementations:
self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read())
assert exists(clone_path + '/data.blob/activity/activity.info')
self.assertEqual(
- [client.api_url.value, ['stable'], downloaded_solution],
+ [client.api.value, ['stable'], downloaded_solution],
json.load(file('solutions/bu/bundle_id')))
self.assertEqual([
@@ -795,7 +795,7 @@ Can't find all required implementations:
self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read())
assert not exists(clone_path)
self.assertEqual(
- [client.api_url.value, ['stable'], downloaded_solution],
+ [client.api.value, ['stable'], downloaded_solution],
json.load(file('solutions/bu/bundle_id')))
self.assertEqual([
@@ -814,7 +814,7 @@ Can't find all required implementations:
sorted(ipc.get(['context'], reply='layer')['result']))
assert exists(clone_path + '/data.blob/activity/activity.info')
self.assertEqual(
- [client.api_url.value, ['stable'], downloaded_solution],
+ [client.api.value, ['stable'], downloaded_solution],
json.load(file('solutions/bu/bundle_id')))
def test_clone_ActivityWithStabilityPreferences(self):
@@ -987,7 +987,7 @@ Can't find all required implementations:
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], downloaded_solution],
+ [client.api.value, ['stable'], downloaded_solution],
json.load(file('solutions/bu/bundle_id')))
blob = self.zips(['TestActivity/activity/activity.info', [
@@ -1032,7 +1032,7 @@ Can't find all required implementations:
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
self.node.stop()
@@ -1047,7 +1047,7 @@ Can't find all required implementations:
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
shutil.rmtree('solutions')
@@ -1060,7 +1060,7 @@ Can't find all required implementations:
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
def test_launch_Fails(self):
@@ -1142,7 +1142,7 @@ Can't find all required implementations:
[i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')])
assert local['release'].exists(impl)
self.assertEqual(
- [client.api_url.value, ['stable'], solution],
+ [client.api.value, ['stable'], solution],
json.load(file('solutions/bu/bundle_id')))
def test_InvalidateSolutions(self):
@@ -1356,20 +1356,20 @@ Can't find all required implementations:
@db.blob_property()
def blob3(self, value):
- raise http.Redirect(client.api_url.value + prefix + 'blob4')
+ raise http.Redirect(client.api.value + prefix + 'blob4')
self.start_online_client([User, Document])
ipc = IPCConnection()
guid = ipc.post(['document'], {})
prefix = '/document/' + guid + '/'
- response = requests.request('GET', client.api_url.value + prefix + 'blob1', allow_redirects=False)
+ response = requests.request('GET', client.api.value + prefix + 'blob1', allow_redirects=False)
self.assertEqual(303, response.status_code)
self.assertEqual(prefix + 'blob2', response.headers['Location'])
- response = requests.request('GET', client.api_url.value + prefix + 'blob3', allow_redirects=False)
+ response = requests.request('GET', client.api.value + prefix + 'blob3', allow_redirects=False)
self.assertEqual(303, response.status_code)
- self.assertEqual(client.api_url.value + prefix + 'blob4', response.headers['Location'])
+ self.assertEqual(client.api.value + prefix + 'blob4', response.headers['Location'])
def test_DoNotSwitchToOfflineOnRedirectFails(self):
@@ -1386,7 +1386,7 @@ Can't find all required implementations:
local_volume = self.start_online_client([User, Document])
ipc = IPCConnection()
guid = ipc.post(['document'], {})
- prefix = client.api_url.value + '/document/' + guid + '/'
+ prefix = client.api.value + '/document/' + guid + '/'
local_volume['document'].create({'guid': guid})
trigger = self.wait_for_events(ipc, event='inline', state='connecting')
@@ -1561,7 +1561,7 @@ Can't find all required implementations:
def test_inline(self):
routes._RECONNECT_TIMEOUT = 2
- cp = ClientRoutes(Volume('client', model.RESOURCES), client.api_url.value)
+ cp = ClientRoutes(Volume('client', model.RESOURCES), client.api.value)
assert not cp.inline()
trigger = self.wait_for_events(cp, event='inline', state='online')
diff --git a/tests/units/client/releases.py b/tests/units/client/releases.py
index 9c99cec..30f938e 100755
--- a/tests/units/client/releases.py
+++ b/tests/units/client/releases.py
@@ -206,11 +206,11 @@ class Releases(tests.Test):
self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
- client.api_url.value = 'fake'
+ client.api.value = 'fake'
self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.assertEqual(solution, file(cached_path).read())
- client.api_url.value = 'http://127.0.0.1:8888'
+ client.api.value = 'http://127.0.0.1:8888'
self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event'])
self.client_routes._node_mtime = cached_mtime + 2
@@ -267,7 +267,7 @@ class Releases(tests.Test):
}]])
self.touch(['solutions/bu/bundle_id', solution])
- client.api_url.value = 'fake'
+ client.api.value = 'fake'
self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception'])
self.node.stop()
diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py
index 9fad249..7cd03e6 100755
--- a/tests/units/client/routes.py
+++ b/tests/units/client/routes.py
@@ -130,7 +130,7 @@ class RoutesTest(tests.Test):
def test_SetLocalLayerInOffline(self):
volume = db.Volume('client', model.RESOURCES)
- cp = ClientRoutes(volume, client.api_url.value)
+ cp = ClientRoutes(volume, client.api.value)
post = Request(method='POST', path=['context'])
post.content_type = 'application/json'
post.content = {
@@ -153,7 +153,7 @@ class RoutesTest(tests.Test):
def test_CachedClientRoutes(self):
volume = db.Volume('client', model.RESOURCES, lazy_open=True)
- cp = CachedClientRoutes(volume, client.api_url.value)
+ cp = CachedClientRoutes(volume, client.api.value)
post = Request(method='POST', path=['context'])
post.content_type = 'application/json'
@@ -216,7 +216,7 @@ class RoutesTest(tests.Test):
def test_CachedClientRoutes_WipeReports(self):
volume = db.Volume('client', model.RESOURCES, lazy_open=True)
- cp = CachedClientRoutes(volume, client.api_url.value)
+ cp = CachedClientRoutes(volume, client.api.value)
post = Request(method='POST', path=['report'])
post.content_type = 'application/json'
@@ -236,7 +236,7 @@ class RoutesTest(tests.Test):
def test_CachedClientRoutes_OpenOnlyChangedResources(self):
volume = db.Volume('client', model.RESOURCES, lazy_open=True)
- cp = CachedClientRoutes(volume, client.api_url.value)
+ cp = CachedClientRoutes(volume, client.api.value)
guid = call(cp, Request(method='POST', path=['context'], content_type='application/json', content={
'type': 'activity',
'title': 'title',
@@ -247,7 +247,7 @@ class RoutesTest(tests.Test):
cp.close()
volume = db.Volume('client', model.RESOURCES, lazy_open=True)
- cp = CachedClientRoutes(volume, client.api_url.value)
+ cp = CachedClientRoutes(volume, client.api.value)
trigger = self.wait_for_events(cp, event='push')
self.start_master()
@@ -260,7 +260,7 @@ class RoutesTest(tests.Test):
def test_SwitchToOfflineForAbsentOnlineProps(self):
volume = db.Volume('client', model.RESOURCES)
- cp = ClientRoutes(volume, client.api_url.value)
+ cp = ClientRoutes(volume, client.api.value)
post = Request(method='POST', path=['context'])
post.content_type = 'application/json'
diff --git a/tests/units/db/volume.py b/tests/units/db/volume.py
index 04bd9fc..3b10d7e 100755
--- a/tests/units/db/volume.py
+++ b/tests/units/db/volume.py
@@ -24,7 +24,7 @@ from sugar_network.db.directory import Directory
from sugar_network.db.index import IndexWriter
from sugar_network.toolkit.router import ACL
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http
+from sugar_network.toolkit import http, ranges
class VolumeTest(tests.Test):
@@ -78,7 +78,7 @@ class VolumeTest(tests.Test):
{'content-type': 'application/octet-stream', 'content-length': '2', 'path': 'foo/2'},
{'commit': [[1, 5]]},
],
- [dict(i) for i in volume.diff(r, ['foo'])])
+ [dict(i) for i in volume.diff(r, files=['foo'])])
self.assertEqual([[6, None]], r)
r = [[2, 2]]
@@ -126,9 +126,55 @@ class VolumeTest(tests.Test):
{'content-type': 'application/octet-stream', 'content-length': '3', 'path': 'bar/3'},
{'commit': [[7, 9]]},
],
- [dict(i) for i in volume.diff(r, ['foo', 'bar'])])
+ [dict(i) for i in volume.diff(r, files=['foo', 'bar'])])
self.assertEqual([[10, None]], r)
+ def test_diff_SyncUsecase(self):
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop1(self, value):
+ return value
+
+ @db.stored_property()
+ def prop2(self, value):
+ return value
+
+ volume = db.Volume('.', [Document])
+
+ volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1, 'prop1': 1, 'prop2': 1})
+ self.utime('db/document/gu/guid', 1)
+
+ # Fresh update to pull
+ volume['document'].update('guid', {'prop1': 2})
+ self.utime('db/document/gu/guid/prop1', 2)
+
+ # Recently pushed
+ volume['document'].update('guid', {'prop2': 2})
+ self.utime('db/document/gu/guid/prop2', 2)
+
+ # Exclude `prop2` ack from the pull reanges
+ r = [[2, None]]
+ ranges.exclude(r, 3, 3)
+ self.assertEqual([
+ {'resource': 'document'},
+ ],
+ [dict(i) for i in volume.diff(r)])
+ self.assertEqual([[2, 2], [4, None]], r)
+
+ # Pass `prop2` ack in `exclude`
+ r = [[2, None]]
+ self.assertEqual([
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'prop1': {'value': 2, 'mtime': 2},
+ }},
+ {'commit': [[2, 2]]},
+ ],
+ [dict(i) for i in volume.diff(r, [[3, 3]])])
+ self.assertEqual([[4, None]], r)
+
def test_diff_Partial(self):
self.override(time, 'time', lambda: 0)
@@ -649,33 +695,33 @@ class VolumeTest(tests.Test):
volume1 = db.Volume('db1', [Document])
volume2 = db.Volume('db2', [Document])
- committed, patched = volume2.patch(volume1.diff([[1, None]]))
+ seqno, committed = volume2.patch(volume1.diff([[1, None]]))
self.assertEqual([], committed)
- self.assertEqual([], patched)
+ self.assertEqual(None, seqno)
volume1['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1})
- committed, patched = volume2.patch(volume1.diff([[1, None]]))
+ seqno, committed = volume2.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 1]], committed)
- self.assertEqual([[1, 1]], patched)
- committed, patched = volume2.patch(volume1.diff([[1, None]]))
+ self.assertEqual(1, seqno)
+ seqno, committed = volume2.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 1]], committed)
- self.assertEqual([], patched)
+ self.assertEqual(None, seqno)
volume1['document'].update('1', {'prop': '1'})
- committed, patched = volume2.patch(volume1.diff([[1, None]]))
+ seqno, committed = volume2.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 2]], committed)
- self.assertEqual([[2, 2]], patched)
- committed, patched = volume2.patch(volume1.diff([[1, None]]))
+ self.assertEqual(2, seqno)
+ seqno, committed = volume2.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 2]], committed)
- self.assertEqual([], patched)
+ self.assertEqual(None, seqno)
volume3 = db.Volume('db3', [Document])
- committed, patched = volume3.patch(volume1.diff([[1, None]]))
+ seqno, committed = volume3.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 2]], committed)
- self.assertEqual([[1, 1]], patched)
- committed, patched = volume3.patch(volume1.diff([[1, None]]))
+ self.assertEqual(1, seqno)
+ seqno, committed = volume3.patch(volume1.diff([[1, None]]))
self.assertEqual([[1, 2]], committed)
- self.assertEqual([], patched)
+ self.assertEqual(None, seqno)
def test_patch_CallSetters(self):
@@ -725,7 +771,7 @@ class VolumeTest(tests.Test):
yield i
patch = generator()
- self.assertEqual(([[1, 3]], [[101, 101]]), volume.patch(patch))
+ self.assertEqual((101, [[1, 3]]), volume.patch(patch))
assert volume['document'].exists('1')
diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py
index a191c8d..5fba512 100644
--- a/tests/units/node/__main__.py
+++ b/tests/units/node/__main__.py
@@ -2,15 +2,11 @@
from __init__ import tests
-from downloads import *
-from files import *
-from node import *
from obs import *
-from stats_user import *
-from sync_master import *
-from sync_offline import *
-from sync_online import *
from model import *
+from node import *
+from master import *
+from slave import *
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/node/downloads.py b/tests/units/node/downloads.py
deleted file mode 100755
index 29ea5d0..0000000
--- a/tests/units/node/downloads.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import json
-from os.path import exists
-
-from __init__ import tests
-
-from sugar_network.node import downloads
-from sugar_network.toolkit import coroutine
-
-
-class DownloadsTest(tests.Test):
-
- def test_populate(self):
- self.touch(('1', '1'))
- self.touch(('2', '2'))
- self.touch(('2.tag', json.dumps((2, {'file': 2}))))
- self.touch(('3.tag', json.dumps((3, {'file': 3}))))
-
- pool = downloads.Pool('.')
- self.assertEqual(None, pool.get(1))
- self.assertEqual(2, pool.get(2).tag['file'])
- self.assertEqual('2', pool.get(2).open().read())
- self.assertEqual(None, pool.get(3))
-
- def test_ComplexKeys(self):
- key = {-1: None}
-
- self.touch(('file', 'file'))
- self.touch(('file.tag', json.dumps((key, {'file': 2}))))
-
- pool = downloads.Pool('.')
- self.assertEqual('file', pool.get(key).open().read())
- key['foo'] = 'bar'
- pool.set(key, None, lambda path: file(path, 'w').close())
- self.assertNotEqual(None, pool.get(key))
-
- def test_set_Tags(self):
- tag = []
-
- def fetch(path):
- with file(path, 'w') as f:
- f.write('payload')
- tag.append(True)
-
- pool = downloads.Pool('.')
- dl = pool.set('key', tag, fetch)
- self.assertEqual(False, dl.ready)
- self.assertEqual(None, dl.open())
- self.assertEqual([], tag)
-
- coroutine.dispatch()
- self.assertEqual(True, dl.ready)
- self.assertEqual('payload', dl.open().read())
- self.assertEqual([True], tag)
-
- pool2 = downloads.Pool('.')
- dl2 = pool2.get('key')
- self.assertEqual(True, dl2.ready)
- self.assertEqual('payload', dl2.open().read())
- self.assertEqual([True], tag)
-
- def test_Eject(self):
- downloads._POOL_SIZE = 3
- pool = downloads.Pool('.')
-
- pool.set(1, None, lambda path: file(path, 'w').close())
- coroutine.dispatch()
- file1 = pool.get(1).open().name
- pool.set(2, None, lambda path: file(path, 'w').close())
- coroutine.dispatch()
- file2 = pool.get(2).open().name
- pool.set(3, None, lambda path: file(path, 'w').close())
- coroutine.dispatch()
- file3 = pool.get(3).open().name
-
- assert pool.get(1) is not None
- assert exists(file1)
- assert exists(file1 + '.tag')
- assert pool.get(2) is not None
- assert exists(file2)
- assert exists(file2 + '.tag')
- assert pool.get(3) is not None
- assert exists(file3)
- assert exists(file3 + '.tag')
-
- pool.set(4, None, lambda path: file(path, 'w').close())
- pool.set(5, None, lambda path: file(path, 'w').close())
- pool.set(6, None, lambda path: file(path, 'w').close())
-
- assert pool.get(1) is None
- assert not exists(file1)
- assert not exists(file1 + '.tag')
- assert pool.get(2) is None
- assert not exists(file2)
- assert not exists(file2 + '.tag')
- assert pool.get(3) is None
- assert not exists(file3)
- assert not exists(file3 + '.tag')
-
- def test_remove(self):
- pool = downloads.Pool('.')
-
- pool.set(1, None, lambda path: file(path, 'w').close())
- coroutine.dispatch()
- file1 = pool.get(1).open().name
- assert pool.get(1) is not None
- assert exists(file1)
- assert exists(file1 + '.tag')
-
- pool.remove(1)
- assert pool.get(1) is None
- assert not exists(file1)
- assert not exists(file1 + '.tag')
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/files.py b/tests/units/node/files.py
deleted file mode 100755
index cfacc30..0000000
--- a/tests/units/node/files.py
+++ /dev/null
@@ -1,357 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import time
-import json
-from glob import glob
-from os.path import exists
-from cStringIO import StringIO
-
-from __init__ import tests
-
-from sugar_network import db, toolkit
-from sugar_network.node import files
-
-
-class FilesTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
- self.uuid = 0
- self.override(toolkit, 'uuid', self.next_uuid)
-
- def next_uuid(self):
- self.uuid += 1
- return str(self.uuid)
-
- def test_Index_Populate(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- os.utime('files', (1, 1))
- assert seeder.sync()
-
- assert not seeder.sync()
- in_seq = toolkit.Sequence([[1, None]])
- self.assertEqual(
- [{'op': 'commit', 'sequence': []}],
- [i for i in seeder.diff(in_seq)])
- self.assertEqual(0, seqno.value)
- assert not exists('index')
-
- self.touch(('files/1', '1'))
- self.touch(('files/2/3', '3'))
- self.touch(('files/4/5/6', '6'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- assert not seeder.sync()
- in_seq = toolkit.Sequence([[1, None]])
- self.assertEqual(
- [{'op': 'commit', 'sequence': []}],
- [i for i in seeder.diff(in_seq)])
- self.assertEqual(0, seqno.value)
- assert not exists('index')
-
- self.utime('files', 2)
- os.utime('files', (2, 2))
-
- assert seeder.sync()
- in_seq = toolkit.Sequence([[1, None]])
- self.assertEqual(sorted([
- {'op': 'commit', 'sequence': [[1, 3]]},
- {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'},
- {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '2/3'},
- {'op': 'update', 'blob_size': 1, 'blob': '6', 'path': '4/5/6'},
- ]),
- sorted(files_diff(seeder, in_seq)))
- self.assertEqual(3, seqno.value)
- assert exists('index')
- self.assertEqual(
- [[
- [1, '1', os.stat('files/1').st_mtime],
- [2, '2/3', os.stat('files/2/3').st_mtime],
- [3, '4/5/6', os.stat('files/4/5/6').st_mtime],
- ],
- os.stat('files').st_mtime],
- json.load(file('index')))
-
- assert not seeder.sync()
- in_seq = toolkit.Sequence([[4, None]])
- self.assertEqual(
- [{'op': 'commit', 'sequence': []}],
- [i for i in seeder.diff(in_seq)])
- self.assertEqual(3, seqno.value)
-
- def test_Index_SelectiveDiff(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.touch(('files/4', '4'))
- self.touch(('files/5', '5'))
- self.utime('files', 1)
-
- in_seq = toolkit.Sequence([[2, 2], [4, 10], [20, None]])
- self.assertEqual(sorted([
- {'op': 'commit', 'sequence': [[2, 5]]},
- {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'},
- {'op': 'update', 'blob_size': 1, 'blob': '4', 'path': '4'},
- {'op': 'update', 'blob_size': 1, 'blob': '5', 'path': '5'},
- ]),
- sorted(files_diff(seeder, in_seq)))
-
- def test_Index_PartialDiff(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
-
- in_seq = toolkit.Sequence([[1, None]])
- diff = seeder.diff(in_seq)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record)
- self.assertEqual({'op': 'commit', 'sequence': []}, diff.throw(StopIteration))
- self.assertRaises(StopIteration, diff.next)
-
- diff = seeder.diff(in_seq)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'}, record)
- self.assertEqual({'op': 'commit', 'sequence': [[1, 1]]}, diff.throw(StopIteration))
- self.assertRaises(StopIteration, diff.next)
-
- def test_Index_diff_Stretch(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
-
- in_seq = toolkit.Sequence([[1, 1], [3, None]])
- diff = seeder.diff(in_seq)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, record)
- self.assertEqual({'op': 'commit', 'sequence': [[1, 3]]}, next(diff))
- self.assertRaises(StopIteration, diff.next)
-
- def test_Index_diff_DoNotStretchContinuesPacket(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
-
- in_seq = toolkit.Sequence([[1, 1], [3, None]])
- diff = seeder.diff(in_seq, toolkit.Sequence([[1, 1]]))
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record)
- record = next(diff)
- record['blob'] = ''.join([i for i in record['blob']])
- self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, record)
- self.assertEqual({'op': 'commit', 'sequence': [[1, 1], [3, 3]]}, next(diff))
- self.assertRaises(StopIteration, diff.next)
-
- def test_Index_DiffUpdatedFiles(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- for __ in seeder.diff(toolkit.Sequence([[1, None]])):
- pass
- self.assertEqual(3, seqno.value)
-
- os.utime('files/2', (2, 2))
-
- self.assertEqual(
- [{'op': 'commit', 'sequence': []}],
- [i for i in seeder.diff(toolkit.Sequence([[4, None]]))])
- self.assertEqual(3, seqno.value)
-
- os.utime('files', (3, 3))
-
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'},
- {'op': 'commit', 'sequence': [[4, 4]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[4, None]]))))
- self.assertEqual(4, seqno.value)
-
- os.utime('files/1', (4, 4))
- os.utime('files/3', (4, 4))
- os.utime('files', (4, 4))
-
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'},
- {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'},
- {'op': 'commit', 'sequence': [[5, 6]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[5, None]]))))
- self.assertEqual(6, seqno.value)
-
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'},
- {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'},
- {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'},
- {'op': 'commit', 'sequence': [[1, 6]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[1, None]]))))
- self.assertEqual(6, seqno.value)
-
- def test_Index_DiffCreatedFiles(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- for __ in seeder.diff(toolkit.Sequence([[1, None]])):
- pass
- self.assertEqual(3, seqno.value)
-
- self.touch(('files/4', '4'))
- os.utime('files/4', (2, 2))
- os.utime('files', (1, 1))
-
- self.assertEqual(
- [{'op': 'commit', 'sequence': []}],
- [i for i in seeder.diff(toolkit.Sequence([[4, None]]))])
- self.assertEqual(3, seqno.value)
-
- os.utime('files/4', (2, 2))
- os.utime('files', (2, 2))
-
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '4', 'path': '4'},
- {'op': 'commit', 'sequence': [[4, 4]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[4, None]]))))
- self.assertEqual(4, seqno.value)
-
- self.touch(('files/5', '5'))
- os.utime('files/5', (3, 3))
- self.touch(('files/6', '6'))
- os.utime('files/6', (3, 3))
- os.utime('files', (3, 3))
-
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '5', 'path': '5'},
- {'op': 'update', 'blob_size': 1, 'blob': '6', 'path': '6'},
- {'op': 'commit', 'sequence': [[5, 6]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[5, None]]))))
- self.assertEqual(6, seqno.value)
-
- def test_Index_DiffDeletedFiles(self):
- seqno = toolkit.Seqno(tests.tmpdir + '/seqno')
- seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- for __ in seeder.diff(toolkit.Sequence([[1, None]])):
- pass
- self.assertEqual(3, seqno.value)
-
- os.unlink('files/2')
- os.utime('files', (2, 2))
-
- assert seeder.sync()
- self.assertEqual(sorted([
- {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'},
- {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'},
- {'op': 'delete', 'path': '2'},
- {'op': 'commit', 'sequence': [[1, 4]]},
- ]),
- sorted(files_diff(seeder, toolkit.Sequence([[1, None]]))))
- self.assertEqual(4, seqno.value)
-
- os.unlink('files/1')
- os.unlink('files/3')
- os.utime('files', (3, 3))
-
- assert seeder.sync()
- self.assertEqual(sorted([
- {'op': 'delete', 'path': '1'},
- {'op': 'delete', 'path': '2'},
- {'op': 'delete', 'path': '3'},
- {'op': 'commit', 'sequence': [[1, 6]]},
- ]),
- sorted([i for i in seeder.diff(toolkit.Sequence([[1, None]]))]))
- self.assertEqual(6, seqno.value)
-
- assert not seeder.sync()
- self.assertEqual(sorted([
- {'op': 'delete', 'path': '1'},
- {'op': 'delete', 'path': '2'},
- {'op': 'delete', 'path': '3'},
- {'op': 'commit', 'sequence': [[1, 6]]},
- ]),
- sorted([i for i in seeder.diff(toolkit.Sequence([[1, None]]))]))
- self.assertEqual(6, seqno.value)
-
- def test_merge_Updated(self):
- self.assertEqual('commit-sequence', files.merge('dst', [
- {'op': 'update', 'path': '1', 'blob': StringIO('1')},
- {'op': 'update', 'path': '2/2', 'blob': StringIO('22')},
- {'op': 'update', 'path': '3/3/3', 'blob': StringIO('333')},
- {'op': 'commit', 'sequence': 'commit-sequence'},
- ]))
- self.assertEqual('1', file('dst/1').read())
- self.assertEqual('22', file('dst/2/2').read())
- self.assertEqual('333', file('dst/3/3/3').read())
-
- def test_merge_Deleted(self):
- self.touch('dst/1')
- self.touch('dst/2/2')
-
- self.assertEqual('commit-sequence', files.merge('dst', [
- {'op': 'delete', 'path': '1'},
- {'op': 'delete', 'path': '2/2'},
- {'op': 'delete', 'path': '3/3/3'},
- {'op': 'commit', 'sequence': 'commit-sequence'},
- ]))
- assert not exists('dst/1')
- assert not exists('dst/2/2')
- assert not exists('dst/3/3/3')
-
-
-def files_diff(index, in_seq, out_seq=None, **kwargs):
- for record in index.diff(in_seq, out_seq=out_seq, **kwargs):
- if 'blob' in record:
- record['blob'] = ''.join([i for i in record['blob']])
- yield record
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/master.py b/tests/units/node/master.py
new file mode 100755
index 0000000..69fc6a7
--- /dev/null
+++ b/tests/units/node/master.py
@@ -0,0 +1,549 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import gzip
+import time
+import json
+import base64
+import hashlib
+from glob import glob
+from os.path import join, exists, basename
+from StringIO import StringIO
+from base64 import b64decode, b64encode
+
+import rrdtool
+
+from __init__ import tests
+
+from sugar_network.client import Connection, keyfile, api
+from sugar_network.db.directory import Directory
+from sugar_network import db, node, toolkit
+from sugar_network.node.master import MasterRoutes
+from sugar_network.db.volume import Volume
+from sugar_network.model.user import User
+from sugar_network.toolkit.router import Response, File
+from sugar_network.toolkit import coroutine, parcel, http
+
+
+class MasterTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+
+ def next_uuid():
+ self.uuid += 1
+ return self.uuid
+
+ self.uuid = 0
+ self.override(toolkit, 'uuid', next_uuid)
+
+ def test_push(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+ self.touch(('blob1', '1'))
+ self.touch(('blob2', '2'))
+
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ File('./blob1', meta={'content-length': '1'}),
+ File('./blob2', meta={'content-length': '1', 'path': 'foo/bar'}),
+ {'commit': [[1, 3]]},
+ ]),
+ ], header={'to': self.node_routes.guid, 'from': 'slave'}))
+ response = conn.request('POST', [], patch, params={'cmd': 'push'})
+ reply = parcel.decode(response.raw)
+
+ assert volume['document'].exists('1')
+ blob = volume.blobs.get(hashlib.sha1('1').hexdigest())
+ self.assertEqual('1', ''.join(blob.iter_content()))
+ blob = volume.blobs.get('foo/bar')
+ self.assertEqual('2', ''.join(blob.iter_content()))
+
+ self.assertEqual({
+ 'packet': 'ack',
+ 'from': self.node_routes.guid,
+ 'to': 'slave',
+ 'ack': [[1, 1]],
+ 'ranges': [[1, 3]],
+ },
+ next(reply).header)
+ self.assertRaises(StopIteration, next, reply)
+
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'ack': {
+ 'slave': [[[[1, 3]], [[1, 1]]]],
+ },
+ })),
+ response.headers['set-cookie'])
+
+ def test_push_MisaddressedPackets(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ], header={'from': 'slave'}))
+ self.assertRaises(http.BadRequest, conn.request, 'POST', [], patch, params={'cmd': 'push'})
+
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ], header={'to': 'fake', 'from': 'slave'}))
+ self.assertRaises(http.BadRequest, conn.request, 'POST', [], patch, params={'cmd': 'push'})
+
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ], header={'to': '127.0.0.1:7777', 'from': 'slave'}))
+ conn.request('POST', [], patch, params={'cmd': 'push'})
+
+ def test_push_WithCookies(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+ self.touch(('blob', 'blob'))
+
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ }},
+ File('./blob', meta={'content-length': str(len('blob'))}),
+ {'commit': [[1, 2]]},
+ ]),
+ ], header={'to': self.node_routes.guid, 'from': 'slave'}))
+ response = conn.request('POST', [], patch, params={'cmd': 'push'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'ack': {
+ 'slave': [[[[100, 100]], [[200, 200]]]],
+ },
+ })),
+ })
+ reply = parcel.decode(response.raw)
+
+ assert volume['document'].exists('1')
+ blob_digest = hashlib.sha1('blob').hexdigest()
+ blob = volume.blobs.get(blob_digest)
+ self.assertEqual('blob', ''.join(blob.iter_content()))
+
+ self.assertEqual({
+ 'packet': 'ack',
+ 'from': self.node_routes.guid,
+ 'to': 'slave',
+ 'ack': [[1, 1]],
+ 'ranges': [[1, 2]],
+ },
+ next(reply).header)
+ self.assertRaises(StopIteration, next, reply)
+
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'ack': {
+ 'slave': [[[[100, 100]], [[200, 200]]], [[[1, 2]], [[1, 1]]]],
+ },
+ })),
+ response.headers['set-cookie'])
+
+ def test_push_Forward(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+ self.touch(('blob', 'blob'))
+
+ patch = ''.join(parcel.encode([
+ ('pull', {'ranges': [[1, None]]}, []),
+ ('request', {'for': 1}, []),
+ ], header={'to': self.node_routes.guid, 'from': 'slave'}))
+ response = conn.request('POST', [], patch, params={'cmd': 'push'})
+ reply = parcel.decode(response.raw)
+ self.assertRaises(StopIteration, next, reply)
+
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {'slave': []},
+ 'request': [
+ {'to': '127.0.0.1:7777', 'from': 'slave', 'packet': 'request', 'for': 1},
+ ],
+ })),
+ response.headers['set-cookie'])
+
+ def test_pull(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1})
+ self.utime('master/db/document/gu/guid', 1)
+ blob = volume.blobs.post('a')
+ self.touch(('master/files/foo/bar', 'bb'))
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'ctime': {'mtime': 1, 'value': 1},
+ 'guid': {'mtime': 1, 'value': 'guid'},
+ 'mtime': {'mtime': 1, 'value': 1},
+ }},
+ {'content-length': '1', 'content-type': 'application/octet-stream'},
+ {'content-length': '2', 'content-type': 'application/octet-stream', 'path': 'foo/bar'},
+ {'commit': [[1, 3]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'id': 1,
+ 'pull': [[1, None]],
+ })),
+ response.headers['set-cookie'])
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={'cookie': response.headers['set-cookie']})
+ assert not response.raw.read()
+ self.assertEqual(
+ 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly',
+ response.headers['set-cookie'])
+
+ def test_pull_ExcludeAcks(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('master/db/document/1/1', 1)
+ blob = volume.blobs.post('blob')
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node': [[[[0, 0]], [[1, 1]]], [[[0, 0]], [[2, 2]]]],
+ },
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]),
+ ],
+ [(packet.header, [record for record in packet]) for packet in parcel.decode(response.raw)])
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'id': 1,
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node': [[[[0, 0]], [[1, 1]]], [[[0, 0]], [[2, 2]]]],
+ },
+ })),
+ response.headers['set-cookie'])
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={'cookie': response.headers['set-cookie']})
+ assert not response.raw.read()
+ self.assertEqual(
+ 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly',
+ response.headers['set-cookie'])
+
+ def test_pull_ExcludeOnlyAcksIntersection(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('master/db/document/1/1', 1)
+ blob1 = volume.blobs.post('a')
+ volume['document'].create({'guid': '2', 'ctime': 2, 'mtime': 2})
+ self.utime('master/db/document/2/2', 2)
+ blob2 = volume.blobs.post('bb')
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node1': [[[[0, 0]], [[1, 4]]]],
+ 'node2': [[[[0, 0]], [[1, 4]]]],
+ },
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node1': [[[[0, 0]], [[1, 4]]]],
+ 'node2': [[[[0, 0]], [[2, 4]]]],
+ },
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'mtime': 1, 'value': '1'},
+ 'ctime': {'mtime': 1, 'value': 1},
+ 'mtime': {'mtime': 1, 'value': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node1': [[[[0, 0]], [[1, 4]]]],
+ 'node2': [[[[0, 0]], [[1, 3]]]],
+ },
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'content-length': '2', 'content-type': 'application/octet-stream'},
+ {'commit': [[4, 4]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+
+ def test_pull_ExcludeAckRequests(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1})
+ self.utime('master/db/document/1/1', 1)
+ blob = volume.blobs.post('blob')
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ 'ack': {
+ 'node1': [[[[0, 0]], [[1, 2]]]],
+ 'node2': [],
+ },
+ 'request': [
+ {'from': 'node2', 'origin': 'node1', 'ranges': [[0, 0]]},
+ ],
+ }))
+ })
+ reply = parcel.decode(response.raw)
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'to': 'node2', 'packet': 'ack', 'ack': [[1, 2]]}, []),
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in reply])
+
+ def test_pull_Limitted(self):
+ RECORD = 1024 * 1024
+
+ class Document(db.Resource):
+
+ @db.stored_property()
+ def prop(self):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1, 'prop': '.' * RECORD})
+ self.utime('master/db/document/1/1', 1)
+ volume['document'].create({'guid': '2', 'ctime': 2, 'mtime': 2, 'prop': '.' * RECORD})
+ self.utime('master/db/document/2/2', 2)
+ volume['document'].create({'guid': '3', 'ctime': 3, 'mtime': 3, 'prop': '.' * RECORD})
+ self.utime('master/db/document/3/3', 3)
+
+ response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * .5)}, headers={
+ 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({
+ 'pull': [[1, None]],
+ }))
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'id': 1,
+ 'pull': [[1, None]],
+ })),
+ response.headers['set-cookie'])
+
+ response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * 1.5)}, headers={
+ 'cookie': response.headers['set-cookie'],
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'ctime': {'mtime': 1, 'value': 1},
+ 'guid': {'mtime': 1, 'value': '1'},
+ 'mtime': {'mtime': 1, 'value': 1},
+ 'prop': {'mtime': 1, 'value': '.' * RECORD},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'id': 1,
+ 'pull': [[1, None]],
+ })),
+ response.headers['set-cookie'])
+
+ response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * 2.5)}, headers={
+ 'cookie': response.headers['set-cookie'],
+ })
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'guid': '2', 'patch': {
+ 'ctime': {'mtime': 2, 'value': 2},
+ 'guid': {'mtime': 2, 'value': '2'},
+ 'mtime': {'mtime': 2, 'value': 2},
+ 'prop': {'mtime': 2, 'value': '.' * RECORD},
+ }},
+ {'guid': '3', 'patch': {
+ 'ctime': {'mtime': 3, 'value': 3},
+ 'guid': {'mtime': 3, 'value': '3'},
+ 'mtime': {'mtime': 3, 'value': 3},
+ 'prop': {'mtime': 3, 'value': '.' * RECORD},
+ }},
+ {'commit': [[2, 3]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+ self.assertEqual(
+ 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({
+ 'id': 1,
+ 'pull': [[2, None]],
+ })),
+ response.headers['set-cookie'])
+
+ response = conn.request('GET', [], params={'cmd': 'pull'}, headers={
+ 'cookie': response.headers['set-cookie'],
+ })
+ assert not response.raw.read()
+ self.assertEqual(
+ 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly',
+ response.headers['set-cookie'])
+
+ def test_sync(self):
+
+ class Document(db.Resource):
+ pass
+
+ volume = self.start_master([User, Document])
+ conn = Connection(auth=http.SugarAuth(keyfile.value))
+
+ volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1})
+ self.utime('master/db/document/gu/guid', 1)
+ blob1 = volume.blobs.post('1')
+
+ self.touch(('blob2', 'ccc'))
+ patch = ''.join(parcel.encode([
+ ('push', None, [
+ {'resource': 'document'},
+ {'guid': '2', 'patch': {
+ 'guid': {'value': '2', 'mtime': 2},
+ 'ctime': {'value': 2, 'mtime': 2},
+ 'mtime': {'value': 2, 'mtime': 2},
+ }},
+ File('./blob2', meta={'content-length': '3'}),
+ {'commit': [[1, 2]]},
+ ]),
+ ('pull', {'ranges': [[1, None]]}, []),
+ ], header={'to': '127.0.0.1:7777', 'from': 'node'}))
+ response = conn.request('POST', [], patch, params={'cmd': 'sync'})
+ blob2 = volume.blobs.get(hashlib.sha1('ccc').hexdigest())
+
+ self.assertEqual([
+ ({'from': '127.0.0.1:7777', 'to': 'node', 'packet': 'ack', 'ack': [[3, 3]], 'ranges': [[1, 2]]}, []),
+ ({'from': '127.0.0.1:7777', 'packet': 'push'}, [
+ {'resource': 'document'},
+ {'guid': 'guid', 'patch': {
+ 'ctime': {'mtime': 1, 'value': 1},
+ 'guid': {'mtime': 1, 'value': 'guid'},
+ 'mtime': {'mtime': 1, 'value': 1},
+ }},
+ {'content-length': '1', 'content-type': 'application/octet-stream'},
+ {'commit': [[1, 2]]},
+ ]),
+ ],
+ [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)])
+
+ assert volume['document'].exists('2')
+ self.assertEqual('ccc', ''.join(blob2.iter_content()))
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/model.py b/tests/units/node/model.py
index 12b5a21..36937dc 100755
--- a/tests/units/node/model.py
+++ b/tests/units/node/model.py
@@ -7,7 +7,7 @@ import time
from __init__ import tests
from sugar_network import db, toolkit
-from sugar_network.client import Connection, keyfile, api_url
+from sugar_network.client import Connection, keyfile, api
from sugar_network.model.user import User
from sugar_network.model.post import Post
from sugar_network.model.context import Context
@@ -42,404 +42,6 @@ class ModelTest(tests.Test):
], [i for i in events if i['event'] == 'release'])
self.assertEqual(1, volume.releases_seqno.value)
- def test_diff(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = self.start_master([User, Document])
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- guid1 = conn.post(['document'], {'prop': 'a'})
- self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1)
- guid2 = conn.post(['document'], {'prop': 'b'})
- self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2)
-
- in_seq = toolkit.Sequence([[1, None]])
- self.assertEqual([
- {'resource': 'document'},
- {'guid': guid1,
- 'diff': {
- 'guid': {'value': guid1, 'mtime': 1},
- 'mtime': {'value': 0, 'mtime': 1},
- 'ctime': {'value': 0, 'mtime': 1},
- 'prop': {'value': 'a', 'mtime': 1},
- 'author': {'mtime': 1, 'value': {}},
- 'layer': {'mtime': 1, 'value': []},
- 'tags': {'mtime': 1, 'value': []},
- },
- },
- {'guid': guid2,
- 'diff': {
- 'guid': {'value': guid2, 'mtime': 2},
- 'mtime': {'value': 0, 'mtime': 2},
- 'ctime': {'value': 0, 'mtime': 2},
- 'prop': {'value': 'b', 'mtime': 2},
- 'author': {'mtime': 2, 'value': {}},
- 'layer': {'mtime': 2, 'value': []},
- 'tags': {'mtime': 2, 'value': []},
- },
- },
- {'commit': [[1, 2]]},
- ],
- [i for i in model.diff(volume, in_seq)])
- self.assertEqual([[1, None]], in_seq)
-
- def test_diff_Partial(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = self.start_master([User, Document])
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- guid1 = conn.post(['document'], {'prop': 'a'})
- self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1)
- guid2 = conn.post(['document'], {'prop': 'b'})
- self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2)
-
- in_seq = toolkit.Sequence([[1, None]])
- patch = model.diff(volume, in_seq)
- self.assertEqual({'resource': 'document'}, next(patch))
- self.assertEqual(guid1, next(patch)['guid'])
- self.assertEqual({'commit': []}, patch.throw(StopIteration()))
- try:
- next(patch)
- assert False
- except StopIteration:
- pass
-
- patch = model.diff(volume, in_seq)
- self.assertEqual({'resource': 'document'}, next(patch))
- self.assertEqual(guid1, next(patch)['guid'])
- self.assertEqual(guid2, next(patch)['guid'])
- self.assertEqual({'commit': [[1, 1]]}, patch.throw(StopIteration()))
- try:
- next(patch)
- assert False
- except StopIteration:
- pass
-
- def test_diff_Stretch(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = self.start_master([User, Document])
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- guid1 = conn.post(['document'], {'prop': 'a'})
- self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1)
- guid2 = conn.post(['document'], {'prop': 'b'})
- volume['document'].delete(guid2)
- guid3 = conn.post(['document'], {'prop': 'c'})
- self.utime('master/document/%s/%s' % (guid3[:2], guid3), 2)
- guid4 = conn.post(['document'], {'prop': 'd'})
- volume['document'].delete(guid4)
- guid5 = conn.post(['document'], {'prop': 'f'})
- self.utime('master/document/%s/%s' % (guid5[:2], guid5), 2)
-
- in_seq = toolkit.Sequence([[1, None]])
- patch = model.diff(volume, in_seq)
- self.assertEqual({'resource': 'document'}, patch.send(None))
- self.assertEqual(guid1, patch.send(None)['guid'])
- self.assertEqual(guid3, patch.send(None)['guid'])
- self.assertEqual(guid5, patch.send(None)['guid'])
- self.assertEqual({'commit': [[1, 1], [3, 3]]}, patch.throw(StopIteration()))
- try:
- patch.send(None)
- assert False
- except StopIteration:
- pass
-
- patch = model.diff(volume, in_seq)
- self.assertEqual({'resource': 'document'}, patch.send(None))
- self.assertEqual(guid1, patch.send(None)['guid'])
- self.assertEqual(guid3, patch.send(None)['guid'])
- self.assertEqual(guid5, patch.send(None)['guid'])
- self.assertEqual({'commit': [[1, 5]]}, patch.send(None))
- try:
- patch.send(None)
- assert False
- except StopIteration:
- pass
-
- def test_diff_DoNotStretchContinuesPacket(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = self.start_master([User, Document])
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- guid1 = conn.post(['document'], {'prop': 'a'})
- volume['document'].delete(guid1)
- guid2 = conn.post(['document'], {'prop': 'b'})
- volume['document'].delete(guid2)
- guid3 = conn.post(['document'], {'prop': 'c'})
- self.utime('master/document/%s/%s' % (guid3[:2], guid3), 2)
- guid4 = conn.post(['document'], {'prop': 'd'})
- volume['document'].delete(guid4)
- guid5 = conn.post(['document'], {'prop': 'f'})
- self.utime('master/document/%s/%s' % (guid5[:2], guid5), 2)
-
- in_seq = toolkit.Sequence([[1, None]])
- patch = model.diff(volume, in_seq, toolkit.Sequence([[1, 1]]))
- self.assertEqual({'resource': 'document'}, patch.send(None))
- self.assertEqual(guid3, patch.send(None)['guid'])
- self.assertEqual(guid5, patch.send(None)['guid'])
- self.assertEqual({'commit': [[1, 1], [3, 3], [5, 5]]}, patch.send(None))
- try:
- patch.send(None)
- assert False
- except StopIteration:
- pass
-
- def test_diff_TheSameInSeqForAllDocuments(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document1(db.Resource):
- pass
-
- class Document2(db.Resource):
- pass
-
- class Document3(db.Resource):
- pass
-
- volume = self.start_master([User, Document1, Document2, Document3])
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- guid3 = conn.post(['document1'], {})
- self.utime('master/document/%s/%s' % (guid3[:2], guid3), 3)
- guid2 = conn.post(['document2'], {})
- self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2)
- guid1 = conn.post(['document3'], {})
- self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1)
-
- in_seq = toolkit.Sequence([[1, None]])
- patch = model.diff(volume, in_seq)
- self.assertEqual({'resource': 'document1'}, patch.send(None))
- self.assertEqual(guid3, patch.send(None)['guid'])
- self.assertEqual({'resource': 'document2'}, patch.send(None))
- self.assertEqual(guid2, patch.send(None)['guid'])
- self.assertEqual({'resource': 'document3'}, patch.send(None))
- self.assertEqual(guid1, patch.send(None)['guid'])
- self.assertEqual({'commit': [[1, 3]]}, patch.send(None))
- try:
- patch.send(None)
- assert False
- except StopIteration:
- pass
-
- def test_merge_Create(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document1(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- class Document2(db.Resource):
- pass
-
- self.touch(('master/db.seqno', '100'))
- volume = self.start_master([Document1, Document2])
-
- records = [
- {'resource': 'document1'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'ctime': {'value': 2, 'mtime': 2.0},
- 'mtime': {'value': 3, 'mtime': 3.0},
- 'prop': {'value': '4', 'mtime': 4.0},
- }},
- {'resource': 'document2'},
- {'guid': '5', 'diff': {
- 'guid': {'value': '5', 'mtime': 5.0},
- 'ctime': {'value': 6, 'mtime': 6.0},
- 'mtime': {'value': 7, 'mtime': 7.0},
- }},
- {'commit': [[1, 2]]},
- ]
- self.assertEqual(([[1, 2]], [[101, 102]]), model.merge(volume, records))
-
- self.assertEqual(
- {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3},
- volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(1, os.stat('master/document1/1/1/guid').st_mtime)
- self.assertEqual(2, os.stat('master/document1/1/1/ctime').st_mtime)
- self.assertEqual(3, os.stat('master/document1/1/1/mtime').st_mtime)
- self.assertEqual(4, os.stat('master/document1/1/1/prop').st_mtime)
-
- self.assertEqual(
- {'guid': '5', 'ctime': 6, 'mtime': 7},
- volume['document2'].get('5').properties(['guid', 'ctime', 'mtime']))
- self.assertEqual(5, os.stat('master/document2/5/5/guid').st_mtime)
- self.assertEqual(6, os.stat('master/document2/5/5/ctime').st_mtime)
- self.assertEqual(7, os.stat('master/document2/5/5/mtime').st_mtime)
-
- def test_merge_Update(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- self.touch(('master/db.seqno', '100'))
- volume = db.Volume('master', [Document])
- volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1})
- for i in os.listdir('master/document/1/1'):
- os.utime('master/document/1/1/%s' % i, (2, 2))
-
- records = [
- {'resource': 'document'},
- {'guid': '1', 'diff': {'prop': {'value': '2', 'mtime': 1.0}}},
- {'commit': [[1, 1]]},
- ]
- self.assertEqual(([[1, 1]], []), model.merge(volume, records))
- self.assertEqual(
- {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(2, os.stat('master/document/1/1/prop').st_mtime)
-
- records = [
- {'resource': 'document'},
- {'guid': '1', 'diff': {'prop': {'value': '3', 'mtime': 2.0}}},
- {'commit': [[2, 2]]},
- ]
- self.assertEqual(([[2, 2]], []), model.merge(volume, records))
- self.assertEqual(
- {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(2, os.stat('master/document/1/1/prop').st_mtime)
-
- records = [
- {'resource': 'document'},
- {'guid': '1', 'diff': {'prop': {'value': '4', 'mtime': 3.0}}},
- {'commit': [[3, 3]]},
- ]
- self.assertEqual(([[3, 3]], [[102, 102]]), model.merge(volume, records))
- self.assertEqual(
- {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(3, os.stat('master/document/1/1/prop').st_mtime)
-
- def test_merge_MultipleCommits(self):
- self.override(time, 'time', lambda: 0)
-
- class Document(db.Resource):
-
- @db.stored_property()
- def prop(self, value):
- return value
-
- self.touch(('master/db.seqno', '100'))
- volume = db.Volume('master', [Document])
-
- def generator():
- for i in [
- {'resource': 'document'},
- {'commit': [[1, 1]]},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'ctime': {'value': 2, 'mtime': 2.0},
- 'mtime': {'value': 3, 'mtime': 3.0},
- 'prop': {'value': '4', 'mtime': 4.0},
- }},
- {'commit': [[2, 3]]},
- ]:
- yield i
-
- records = generator()
- self.assertEqual(([[1, 3]], [[101, 101]]), model.merge(volume, records))
- assert volume['document'].exists('1')
-
- def test_diff_ByLayers(self):
- self.override(time, 'time', lambda: 0)
- self.override(NodeRoutes, 'authorize', lambda self, user, role: True)
-
- class Context(db.Resource):
- pass
-
- class Post(db.Resource):
- pass
-
- this.request = Request()
- volume = db.Volume('db', [Context, Post])
- volume['context'].create({'guid': '0', 'ctime': 1, 'mtime': 1, 'layer': ['layer0', 'common']})
- volume['context'].create({'guid': '1', 'ctime': 1, 'mtime': 1, 'layer': ['layer1']})
- volume['post'].create({'guid': '3', 'ctime': 3, 'mtime': 3, 'layer': 'layer3'})
-
- volume['context'].update('0', {'tags': '0'})
- volume['context'].update('1', {'tags': '1'})
- volume['post'].update('3', {'tags': '3'})
- self.utime('db', 0)
-
- self.assertEqual(sorted([
- {'resource': 'context'},
- {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}},
- {'guid': '1', 'diff': {'tags': {'value': '1', 'mtime': 0}}},
- {'resource': 'post'},
- {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}},
- {'commit': [[4, 6]]},
- ]),
- sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]))]))
-
- self.assertEqual(sorted([
- {'resource': 'context'},
- {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}},
- {'guid': '1', 'diff': {'tags': {'value': '1', 'mtime': 0}}},
- {'resource': 'post'},
- {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}},
- {'commit': [[4, 6]]},
- ]),
- sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='layer1')]))
-
- self.assertEqual(sorted([
- {'resource': 'context'},
- {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}},
- {'resource': 'post'},
- {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}},
- {'commit': [[4, 6]]},
- ]),
- sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='layer2')]))
-
- self.assertEqual(sorted([
- {'resource': 'context'},
- {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}},
- {'resource': 'post'},
- {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}},
- {'commit': [[4, 6]]},
- ]),
- sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='foo')]))
-
def test_Packages(self):
self.override(obs, 'get_repos', lambda: [
{'lsb_id': 'Gentoo', 'lsb_release': '2.1', 'name': 'Gentoo-2.1', 'arches': ['x86', 'x86_64']},
@@ -449,7 +51,7 @@ class ModelTest(tests.Test):
self.override(obs, 'resolve', lambda repo, arch, names: {'version': '1.0'})
volume = self.start_master([User, model.Context])
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
guid = conn.post(['context'], {
'type': 'package',
@@ -463,7 +65,7 @@ class ModelTest(tests.Test):
})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']},
},
@@ -487,7 +89,7 @@ class ModelTest(tests.Test):
})
self.assertEqual({
'Gentoo': {
- 'seqno': 5,
+ 'seqno': 6,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']},
},
@@ -509,7 +111,7 @@ class ModelTest(tests.Test):
})
self.assertEqual({
'Debian-6.0': {
- 'seqno': 7,
+ 'seqno': 8,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']},
},
@@ -526,7 +128,7 @@ class ModelTest(tests.Test):
self.override(obs, 'resolve', lambda repo, arch, names: enforce(False, 'resolve failed'))
volume = self.start_master([User, model.Context])
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
guid = conn.post(['context'], {
'type': 'package',
@@ -540,7 +142,7 @@ class ModelTest(tests.Test):
})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']},
},
@@ -558,7 +160,7 @@ class ModelTest(tests.Test):
])
volume = self.start_master([User, model.Context])
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
guid = conn.post(['context'], {
'type': 'package',
'title': 'title',
@@ -570,7 +172,7 @@ class ModelTest(tests.Test):
conn.put(['context', guid, 'releases', '*'], {'binary': '1'})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['1']},
},
@@ -586,12 +188,12 @@ class ModelTest(tests.Test):
conn.put(['context', guid, 'releases', 'Debian'], {'binary': '2'})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['1']},
},
'Debian': {
- 'seqno': 4,
+ 'seqno': 5,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['2']},
},
@@ -607,17 +209,17 @@ class ModelTest(tests.Test):
conn.put(['context', guid, 'releases', 'Debian-6.0'], {'binary': '3'})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['1']},
},
'Debian': {
- 'seqno': 4,
+ 'seqno': 5,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['2']},
},
'Debian-6.0': {
- 'seqno': 5,
+ 'seqno': 6,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['3']},
},
@@ -633,17 +235,17 @@ class ModelTest(tests.Test):
conn.put(['context', guid, 'releases', 'Debian'], {'binary': '4'})
self.assertEqual({
'*': {
- 'seqno': 3,
+ 'seqno': 4,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['1']},
},
'Debian': {
- 'seqno': 6,
+ 'seqno': 7,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['4']},
},
'Debian-6.0': {
- 'seqno': 5,
+ 'seqno': 6,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {'binary': ['3']},
},
diff --git a/tests/units/node/node.py b/tests/units/node/node.py
index 82d4e43..36c6285 100755
--- a/tests/units/node/node.py
+++ b/tests/units/node/node.py
@@ -16,10 +16,8 @@ from os.path import exists, join
from __init__ import tests
from sugar_network import db, node, model, client
-from sugar_network.client import Connection, keyfile, api_url
+from sugar_network.client import Connection, keyfile, api
from sugar_network.toolkit import http, coroutine
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.node import stats_user
from sugar_network.node.routes import NodeRoutes
from sugar_network.node.master import MasterRoutes
from sugar_network.model.user import User
@@ -32,74 +30,6 @@ from sugar_network.toolkit import http
class NodeTest(tests.Test):
- def setUp(self):
- tests.Test.setUp(self)
- node.stats_root.value = 'stats'
- stats_user.stats_user_step.value = 1
- stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100']
-
- def test_UserStats(self):
- volume = self.start_master()
- conn = Connection(auth=http.SugarAuth(keyfile.value))
-
- this.call(method='POST', path=['user'], principal=tests.UID, content={
- 'name': 'user',
- 'pubkey': tests.PUBKEY,
- })
-
- ts = int(time.time())
-
- self.assertEqual({
- 'enable': True,
- 'status': {},
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'step': stats_user.stats_user_step.value,
- },
- this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID))
-
- this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={
- 'name': 'test',
- 'values': [(ts + 1, {'field': '1'})],
- })
-
- self.assertEqual({
- 'enable': True, 'status': {
- 'test': ts + 2,
- },
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'step': stats_user.stats_user_step.value,
- },
- this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID))
-
- this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={
- 'name': 'test',
- 'values': [(ts + 2, {'field': '2'})],
- })
-
- self.assertEqual({
- 'enable': True, 'status': {
- 'test': ts + 3,
- },
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'step': stats_user.stats_user_step.value,
- },
- this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID))
-
- this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={
- 'name': 'test2',
- 'values': [(ts + 3, {'field': '3'})],
- })
-
- self.assertEqual({
- 'enable': True, 'status': {
- 'test': ts + 3,
- 'test2': ts + 4,
- },
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'step': stats_user.stats_user_step.value,
- },
- this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID))
-
def test_HandleDeletes(self):
volume = self.start_master()
conn = Connection(auth=http.SugarAuth(keyfile.value))
@@ -111,7 +41,7 @@ class NodeTest(tests.Test):
'summary': 'summary',
'description': 'description',
})
- guid_path = 'master/context/%s/%s' % (guid[:2], guid)
+ guid_path = 'master/db/context/%s/%s' % (guid[:2], guid)
assert exists(guid_path)
self.assertEqual({
@@ -181,6 +111,9 @@ class NodeTest(tests.Test):
class Routes(NodeRoutes):
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, 'node', **kwargs)
+
@route('GET', [None, None], cmd='probe1', acl=ACL.AUTH)
def probe1(self, directory):
pass
@@ -208,6 +141,9 @@ class NodeTest(tests.Test):
class Routes(NodeRoutes):
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, 'node', **kwargs)
+
@route('GET', [None, None], cmd='probe1', acl=ACL.AUTHOR)
def probe1(self):
pass
@@ -260,6 +196,9 @@ class NodeTest(tests.Test):
class Routes(NodeRoutes):
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, 'node', **kwargs)
+
@route('PROBE', acl=ACL.SUPERUSER)
def probe(self):
return 'ok'
@@ -319,6 +258,9 @@ class NodeTest(tests.Test):
class Routes(NodeRoutes):
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, 'node', **kwargs)
+
@route('PROBE', acl=ACL.SUPERUSER)
def probe(self):
pass
@@ -338,6 +280,9 @@ class NodeTest(tests.Test):
class Routes(NodeRoutes):
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, 'node', **kwargs)
+
@route('PROBE1', acl=ACL.AUTH)
def probe1(self, request):
pass
@@ -479,27 +424,26 @@ class NodeTest(tests.Test):
})
def test_PackagesRoute(self):
- node.files_root.value = '.'
- self.touch(('packages/repo/arch/package', 'file'))
volume = self.start_master()
client = Connection(auth=http.SugarAuth(keyfile.value))
- self.assertEqual(['repo'], client.get(['packages']))
- self.assertEqual(['arch'], client.get(['packages', 'repo']))
+ self.touch(('master/files/packages/repo/arch/package', 'file'))
+ volume.blobs.populate()
+
+ self.assertEqual([], client.get(['packages']))
+ self.assertEqual([], client.get(['packages', 'repo']))
self.assertEqual(['package'], client.get(['packages', 'repo', 'arch']))
self.assertEqual('file', client.get(['packages', 'repo', 'arch', 'package']))
def test_PackageUpdatesRoute(self):
- node.files_root.value = '.'
- self.touch(
- ('packages/repo/1', '', 1),
- ('packages/repo/1.1', '', 1),
- ('packages/repo/2', '', 2),
- ('packages/repo/2.2', '', 2),
- )
volume = self.start_master()
ipc = Connection(auth=http.SugarAuth(keyfile.value))
+ self.touch('master/files/packages/repo/1', 'master/files/packages/repo/1.1')
+ volume.blobs.populate()
+ self.touch('master/files/packages/repo/2', 'master/files/packages/repo/2.2')
+ volume.blobs.populate()
+
self.assertEqual(
sorted(['1', '2']),
sorted(ipc.get(['packages', 'repo', 'updates'])))
@@ -552,7 +496,7 @@ class NodeTest(tests.Test):
self.assertEqual({
release: {
- 'seqno': 4,
+ 'seqno': 6,
'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}},
'value': {
'license': ['Public Domain'],
@@ -580,7 +524,7 @@ class NodeTest(tests.Test):
def test_Solve(self):
volume = self.start_master()
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
activity_file = json.load(conn.request('POST', ['context'],
self.zips(('topdir/activity/activity.info', '\n'.join([
@@ -623,7 +567,7 @@ class NodeTest(tests.Test):
def test_SolveWithArguments(self):
volume = self.start_master()
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
activity_file = json.load(conn.request('POST', ['context'],
self.zips(('topdir/activity/activity.info', '\n'.join([
@@ -683,7 +627,7 @@ class NodeTest(tests.Test):
def test_Clone(self):
volume = self.start_master()
- conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value))
+ conn = http.Connection(api.value, http.SugarAuth(keyfile.value))
activity_info = '\n'.join([
'[Activity]',
diff --git a/tests/units/node/slave.py b/tests/units/node/slave.py
new file mode 100755
index 0000000..6e75602
--- /dev/null
+++ b/tests/units/node/slave.py
@@ -0,0 +1,529 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import json
+import time
+import hashlib
+from os.path import exists
+
+from __init__ import tests
+
+from sugar_network import db, toolkit
+from sugar_network.client import Connection, keyfile
+from sugar_network.node import master_api
+from sugar_network.node.master import MasterRoutes
+from sugar_network.node.slave import SlaveRoutes
+from sugar_network.db.volume import Volume
+from sugar_network.model.user import User
+from sugar_network.toolkit.router import Router, File
+from sugar_network.toolkit import coroutine, http, parcel
+
+
+class SlaveTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+
+ class Document(db.Resource):
+
+ @db.indexed_property(db.Localized, slot=1, prefix='N', full_text=True)
+ def title(self, value):
+ return value
+
+ @db.indexed_property(db.Localized, prefix='D', full_text=True)
+ def message(self, value):
+ return value
+
+ @db.stored_property(db.Blob)
+ def blob(self, value):
+ return value
+
+ self.Document = Document
+ self.slave_volume = Volume('slave', [User, Document])
+ self.slave_routes = SlaveRoutes(volume=self.slave_volume)
+ self.slave_server = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.slave_routes))
+ coroutine.spawn(self.slave_server.serve_forever)
+ coroutine.dispatch()
+
+ def test_online_sync_Push(self):
+ self.fork_master([User, self.Document])
+ master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value))
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges')))
+
+ guid1 = slave.post(['document'], {'message': '1', 'title': ''})
+ guid2 = slave.post(['document'], {'message': '2', 'title': ''})
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'message': '1'},
+ {'guid': guid2, 'message': '2'},
+ ],
+ master.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[2, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges')))
+
+ guid3 = slave.post(['document'], {'message': '3', 'title': ''})
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'message': '1'},
+ {'guid': guid2, 'message': '2'},
+ {'guid': guid3, 'message': '3'},
+ ],
+ master.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[3, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ slave.put(['document', guid2], {'message': '22'})
+ slave.post(cmd='online_sync')
+ self.assertEqual('22', master.get(['document', guid2, 'message']))
+ self.assertEqual([[4, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[7, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ slave.delete(['document', guid1])
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid2, 'message': '22'},
+ {'guid': guid3, 'message': '3'},
+ ],
+ master.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[5, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[8, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ slave.put(['document', guid1], {'message': 'a'})
+ slave.put(['document', guid2], {'message': 'b'})
+ slave.put(['document', guid3], {'message': 'c'})
+ guid4 = slave.post(['document'], {'message': 'd', 'title': ''})
+ slave.delete(['document', guid2])
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid3, 'message': 'c'},
+ {'guid': guid4, 'message': 'd'},
+ ],
+ master.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[13, None]], json.load(file('slave/var/push.ranges')))
+
+ def test_online_sync_Pull(self):
+ self.fork_master([User, self.Document])
+ master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value))
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges')))
+
+ guid1 = master.post(['document'], {'message': '1', 'title': ''})
+ guid2 = master.post(['document'], {'message': '2', 'title': ''})
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'message': '1'},
+ {'guid': guid2, 'message': '2'},
+ ],
+ slave.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[5, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges')))
+
+ guid3 = master.post(['document'], {'message': '3', 'title': ''})
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'message': '1'},
+ {'guid': guid2, 'message': '2'},
+ {'guid': guid3, 'message': '3'},
+ ],
+ slave.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[3, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ master.put(['document', guid2], {'message': '22'})
+ slave.post(cmd='online_sync')
+ self.assertEqual('22', slave.get(['document', guid2, 'message']))
+ self.assertEqual([[7, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ master.delete(['document', guid1])
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid2, 'message': '22'},
+ {'guid': guid3, 'message': '3'},
+ ],
+ slave.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[8, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges')))
+
+ coroutine.sleep(1)
+ master.put(['document', guid1], {'message': 'a'})
+ master.put(['document', guid2], {'message': 'b'})
+ master.put(['document', guid3], {'message': 'c'})
+ guid4 = master.post(['document'], {'message': 'd', 'title': ''})
+ master.delete(['document', guid2])
+ slave.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid3, 'message': 'c'},
+ {'guid': guid4, 'message': 'd'},
+ ],
+ slave.get(['document'], reply=['guid', 'message'])['result'])
+ self.assertEqual([[13, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges')))
+
+ def test_online_sync_PullBlobs(self):
+ self.fork_master([User, self.Document])
+ master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value))
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges')))
+
+ guid = master.post(['document'], {'message': '1', 'title': ''})
+ master.put(['document', guid, 'blob'], 'blob')
+ self.touch(('master/files/foo/bar', 'file'))
+
+ slave.post(cmd='online_sync')
+ self.assertEqual('blob', slave.request('GET', ['document', guid, 'blob']).content)
+ self.assertEqual('file', file('slave/files/foo/bar').read())
+
+ def test_online_sync_PullFromPreviouslyMergedRecord(self):
+ self.fork_master([User, self.Document])
+ master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value))
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ slave.post(cmd='online_sync')
+ self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges')))
+ self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges')))
+
+ guid = slave.post(['document'], {'message': '1', 'title': '1'})
+ slave.post(cmd='online_sync')
+
+ coroutine.sleep(1)
+ master.put(['document', guid], {'message': '1_'})
+ slave.put(['document', guid], {'title': '1_'})
+ slave.post(cmd='online_sync')
+
+ self.assertEqual('1_', master.get(['document', guid, 'message']))
+ self.assertEqual('1_', master.get(['document', guid, 'title']))
+ self.assertEqual('1_', slave.get(['document', guid, 'message']))
+ self.assertEqual('1_', slave.get(['document', guid, 'title']))
+
+ def test_offline_sync_Import(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ self.touch(('blob1', 'a'))
+ self.touch(('blob2', 'bb'))
+ parcel.encode_dir([
+ ('push', {'from': '127.0.0.1:7777'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ File('./blob1', meta={'content-length': '1'}),
+ File('./blob2', meta={'content-length': '2', 'path': 'foo/bar'}),
+ {'commit': [[1, 2]]},
+ ]),
+ ('ack', {'ack': [[101, 103]], 'ranges': [[1, 3]], 'from': '127.0.0.1:7777', 'to': self.slave_routes.guid}, []),
+ ],
+ root='sync', limit=99999999)
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+
+ self.assertEqual(1, slave.get(['document', '1', 'ctime']))
+ self.assertEqual('a', file(self.slave_volume.blobs.get(hashlib.sha1('a').hexdigest()).path).read())
+ self.assertEqual('bb', file(self.slave_volume.blobs.get('foo/bar').path).read())
+ self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges')))
+ self.assertEqual([[3, 100], [104, None]], json.load(file('slave/var/pull.ranges')))
+
+ self.assertEqual(
+ sorted([
+ ({'from': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ {'content-length': '1'},
+ {'content-length': '2', 'path': 'foo/bar'},
+ {'commit': [[1, 2]]},
+ ]),
+ ({'ack': [[101, 103]], 'from': '127.0.0.1:7777', 'packet': 'ack', 'ranges': [[1, 3]], 'to': self.slave_routes.guid}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[3, 100], [104, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ def test_offline_sync_ImportPush(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ self.touch(('blob1', 'a'))
+ self.touch(('blob2', 'bb'))
+ parcel.encode_dir([
+ ('push', {'from': '127.0.0.1:7777'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ File('./blob1', meta={'content-length': '1'}),
+ File('./blob2', meta={'content-length': '2', 'path': 'foo/bar'}),
+ {'commit': [[1, 2]]},
+ ]),
+ ],
+ root='sync', limit=99999999)
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+
+ self.assertEqual(1, slave.get(['document', '1', 'ctime']))
+ self.assertEqual('a', file(self.slave_volume.blobs.get(hashlib.sha1('a').hexdigest()).path).read())
+ self.assertEqual('bb', file(self.slave_volume.blobs.get('foo/bar').path).read())
+ self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges')))
+ self.assertEqual([[3, None]], json.load(file('slave/var/pull.ranges')))
+
+ self.assertEqual(
+ sorted([
+ ({'from': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ {'content-length': '1'},
+ {'content-length': '2', 'path': 'foo/bar'},
+ {'commit': [[1, 2]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[3, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ def test_offline_sync_ImportAck(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ parcel.encode_dir([
+ ('ack', {'ack': [[101, 103]], 'ranges': [[1, 3]], 'from': '127.0.0.1:7777', 'to': self.slave_routes.guid}, []),
+ ],
+ root='sync', limit=99999999)
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+
+ self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges')))
+ self.assertEqual([[1, 100], [104, None]], json.load(file('slave/var/pull.ranges')))
+
+ self.assertEqual(
+ sorted([
+ ({'ack': [[101, 103]], 'from': '127.0.0.1:7777', 'packet': 'ack', 'ranges': [[1, 3]], 'to': self.slave_routes.guid}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, 100], [104, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ def test_offline_sync_GenerateRequestAfterImport(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ parcel.encode_dir([
+ ('push', {'from': 'another-slave'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ],
+ root='sync', limit=99999999)
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+
+ self.assertEqual(1, slave.get(['document', '1', 'ctime']))
+ self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges')))
+ self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges')))
+
+ self.assertEqual(
+ sorted([
+ ({'from': 'another-slave', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': '1', 'patch': {
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0},
+ 'title': {'value': {}, 'mtime': 0},
+ 'message': {'value': {}, 'mtime': 0},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'request', 'to': '127.0.0.1:7777', 'origin': 'another-slave', 'ranges': [[1, 1]]}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ def test_offline_sync_Export(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ class statvfs(object):
+
+ f_bfree = None
+ f_frsize = 1
+
+ self.override(os, 'statvfs', lambda *args: statvfs())
+ statvfs.f_bfree = 999999999
+ self.override(time, 'time', lambda: 0)
+
+ guid = slave.post(['document'], {'message': '', 'title': ''})
+ push_seqno = self.slave_volume.seqno.value + 1
+ self.slave_routes._push_r.value = [[push_seqno, None]]
+ slave.put(['document', guid, 'title'], 'probe')
+ self.slave_volume.blobs.post('a')
+ self.touch(('slave/files/foo/bar', 'bb'))
+ self.slave_volume.blobs.populate()
+
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+
+ self.assertEqual(
+ sorted([
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': 'probe'}, 'mtime': self.slave_volume['document'].get(guid).meta('title')['mtime']},
+ }},
+ {'resource': 'user'},
+ {'content-length': '1', 'content-type': 'application/octet-stream'},
+ {'commit': [[push_seqno, push_seqno + 1]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ def test_offline_sync_ContinuousExport(self):
+ slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value))
+
+ class statvfs(object):
+
+ f_bfree = None
+ f_frsize = 1
+
+ self.override(os, 'statvfs', lambda *args: statvfs())
+ self.override(time, 'time', lambda: 0)
+
+ guid1 = slave.post(['document'], {'message': '', 'title': ''})
+ guid2 = slave.post(['document'], {'message': '', 'title': ''})
+ push_seqno = self.slave_volume.seqno.value + 1
+ self.slave_routes._push_r.value = [[push_seqno, None]]
+
+ RECORD = 1024 * 1024
+ slave.put(['document', guid1, 'title'], '.' * RECORD)
+ slave.put(['document', guid2, 'title'], '.' * RECORD)
+ statvfs.f_bfree = parcel._RESERVED_DISK_SPACE + RECORD * 1.5
+
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+ self.assertEqual(
+ sorted([
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid1, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']},
+ }},
+ {'commit': [[push_seqno, push_seqno]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+ self.assertEqual(
+ sorted([
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid1, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']},
+ }},
+ {'commit': [[push_seqno, push_seqno]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid2, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid2).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid2).meta('title')['mtime']},
+ }},
+ {'resource': 'user'},
+ {'commit': [[push_seqno + 1, push_seqno + 1]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+ slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync')
+ self.assertEqual(
+ sorted([
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid1, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']},
+ }},
+ {'commit': [[push_seqno, push_seqno]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'guid': guid2, 'patch': {
+ 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid2).meta('mtime')['mtime']},
+ 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid2).meta('title')['mtime']},
+ }},
+ {'resource': 'user'},
+ {'commit': [[push_seqno + 1, push_seqno + 1]]},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [
+ {'resource': 'document'},
+ {'resource': 'user'},
+ ]),
+ ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [
+ ]),
+ ]),
+ sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')]))
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/stats_user.py b/tests/units/node/stats_user.py
deleted file mode 100755
index 49275b5..0000000
--- a/tests/units/node/stats_user.py
+++ /dev/null
@@ -1,123 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import json
-import time
-
-from __init__ import tests
-
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.node.stats_user import stats_user_step, stats_user_rras, diff, merge, commit
-
-
-class StatsTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
- stats_user_step.value = 1
- stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100']
-
- def test_diff(self):
- ts = int(time.time())
-
- rrd = Rrd('stats/user/dir1/user1', stats_user_step.value, stats_user_rras.value)
- rrd['db1'].put({'a': 1}, ts)
- rrd['db1'].put({'a': 2}, ts + 1)
-
- rrd = Rrd('stats/user/dir1/user2', stats_user_step.value, stats_user_rras.value)
- rrd['db2'].put({'b': 3}, ts)
-
- rrd = Rrd('stats/user/dir2/user3', stats_user_step.value, stats_user_rras.value)
- rrd['db3'].put({'c': 4}, ts)
- rrd['db4'].put({'d': 5}, ts)
-
- self.assertEqual([
- {'db': 'db3', 'user': 'user3'},
- {'timestamp': ts, 'values': {'c': 4.0}},
- {'db': 'db4', 'user': 'user3'},
- {'timestamp': ts, 'values': {'d': 5.0}},
- {'db': 'db2', 'user': 'user2'},
- {'timestamp': ts, 'values': {'b': 3.0}},
- {'db': 'db1', 'user': 'user1'},
- {'timestamp': ts, 'values': {'a': 1.0}},
- {'timestamp': ts + 1, 'values': {'a': 2.0}},
- {'commit': {
- 'user1': {
- 'db1': [[1, ts + 1]],
- },
- 'user2': {
- 'db2': [[1, ts]],
- },
- 'user3': {
- 'db3': [[1, ts]],
- 'db4': [[1, ts]],
- },
- }},
- ],
- [i for i in diff()])
-
- def test_merge(self):
- ts = int(time.time())
-
- self.assertEqual(
- 'info',
- merge([
- {'db': 'db3', 'user': 'user3'},
- {'timestamp': ts, 'values': {'c': 4.0}},
- {'db': 'db4', 'user': 'user3'},
- {'timestamp': ts, 'values': {'d': 5.0}},
- {'db': 'db2', 'user': 'user2'},
- {'timestamp': ts, 'values': {'b': 3.0}},
- {'db': 'db1', 'user': 'user1'},
- {'timestamp': ts, 'values': {'a': 1.0}},
- {'timestamp': ts + 1, 'values': {'a': 2.0}},
- {'commit': 'info'},
- ]))
-
- self.assertEqual([
- [('db1', ts, {'a': 1.0}), ('db1', ts + 1, {'a': 2.0})],
- ],
- [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user1', 1)])
-
- self.assertEqual([
- [('db2', ts, {'b': 3.0})],
- ],
- [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user2', 1)])
-
- self.assertEqual([
- [('db3', ts, {'c': 4.0})],
- [('db4', ts, {'d': 5.0})],
- ],
- [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user3', 1)])
-
- def test_commit(self):
- ts = int(time.time())
- commit({
- 'user1': {
- 'db1': [[1, ts + 1]],
- },
- 'user2': {
- 'db2': [[1, ts]],
- },
- 'user3': {
- 'db3': [[1, ts]],
- 'db4': [[1, ts]],
- },
- })
-
- self.assertEqual(
- [[ts + 2, None]],
- json.load(file('stats/user/us/user1/db1.push')))
- self.assertEqual(
- [[ts + 1, None]],
- json.load(file('stats/user/us/user2/db2.push')))
- self.assertEqual(
- [[ts + 1, None]],
- json.load(file('stats/user/us/user3/db3.push')))
- self.assertEqual(
- [[ts + 1, None]],
- json.load(file('stats/user/us/user3/db4.push')))
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/sync_master.py b/tests/units/node/sync_master.py
deleted file mode 100755
index c946396..0000000
--- a/tests/units/node/sync_master.py
+++ /dev/null
@@ -1,668 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import gzip
-import time
-import json
-import base64
-import hashlib
-from glob import glob
-from os.path import join, exists
-from StringIO import StringIO
-
-import rrdtool
-
-from __init__ import tests
-
-from sugar_network.db.directory import Directory
-from sugar_network import db, node, toolkit
-from sugar_network.node import sync
-from sugar_network.node.master import MasterRoutes
-from sugar_network.db.volume import Volume
-from sugar_network.toolkit import coroutine
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.toolkit.router import Response
-
-
-class statvfs(object):
-
- f_bfree = None
- f_frsize = 1
-
-
-class SyncMasterTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
-
- self.uuid = 0
- self.override(toolkit, 'uuid', self.next_uuid)
- self.override(os, 'statvfs', lambda *args: statvfs())
- statvfs.f_bfree = 999999999
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1, default='')
- def prop(self, value):
- return value
-
- node.files_root.value = 'sync'
- self.volume = Volume('master', [Document])
- self.master = MasterRoutes('127.0.0.1:8888', self.volume)
-
- def next_uuid(self):
- self.uuid += 1
- return str(self.uuid)
-
- def test_sync_ExcludeRecentlyMergedDiffFromPull(self):
- request = Request()
- for chunk in sync.encode([
- ('diff', None, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1},
- 'ctime': {'value': 1, 'mtime': 1},
- 'mtime': {'value': 1, 'mtime': 1},
- 'prop': {'value': 'value', 'mtime': 1},
- }},
- {'commit': [[1, 1]]},
- ]),
- ('pull', {'sequence': [[1, None]]}, None),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = StringIO()
- for chunk in self.master.sync(request):
- response.write(chunk)
- response.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': None}, []),
- ({'packet': 'diff', 'src': '127.0.0.1:8888'}, [{'resource': 'document'}, {'commit': []}]),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(response)])
-
- request = Request()
- for chunk in sync.encode([
- ('pull', {'sequence': [[1, None]]}, None),
- ('diff', None, [
- {'resource': 'document'},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 2},
- 'ctime': {'value': 2, 'mtime': 2},
- 'mtime': {'value': 2, 'mtime': 2},
- 'prop': {'value': 'value', 'mtime': 2},
- }},
- {'commit': [[2, 2]]},
- ]),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = StringIO()
- for chunk in self.master.sync(request):
- response.write(chunk)
- response.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[2, 2]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': None}, []),
- ({'packet': 'diff', 'src': '127.0.0.1:8888'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1},
- 'ctime': {'value': 1, 'mtime': 1},
- 'mtime': {'value': 1, 'mtime': 1},
- 'prop': {'value': 'value', 'mtime': 1},
- }},
- {'commit': [[1, 1]]},
- ]),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(response)])
-
- def test_sync_MisaddressedPackets(self):
- request = Request()
- for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)]):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
- self.assertRaises(RuntimeError, lambda: next(self.master.sync(request)))
-
- request = Request()
- for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)], dst='fake'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
- self.assertRaises(RuntimeError, lambda: next(self.master.sync(request)))
-
- request = Request()
- for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
- next(self.master.sync(request))
-
- def test_push_WithoutCookies(self):
- ts = int(time.time())
-
- request = Request()
- for chunk in sync.package_encode([
- ('diff', None, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1},
- 'ctime': {'value': 1, 'mtime': 1},
- 'mtime': {'value': 1, 'mtime': 1},
- 'prop': {'value': 'value', 'mtime': 1},
- }},
- {'commit': [[1, 1]]},
- ]),
- ('stats_diff', {'dst': '127.0.0.1:8888'}, [
- {'db': 'db', 'user': 'user'},
- {'timestamp': ts, 'values': {'field': 1.0}},
- {'commit': {'user': {'db': [[1, ts]]}}},
- ]),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': None, 'filename': '2.sneakernet'}, []),
- ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[1, ts]]}}, 'src': '127.0.0.1:8888', 'dst': None, 'filename': '2.sneakernet'}, []),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_push_WithCookies(self):
- ts = int(time.time())
-
- request = Request()
- for chunk in sync.package_encode([
- ('pull', {'sequence': [[1, None]]}, None),
- ('files_pull', {'sequence': [[1, None]]}, None),
- ('diff', None, [
- {'resource': 'document'},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 2},
- 'ctime': {'value': 2, 'mtime': 2},
- 'mtime': {'value': 2, 'mtime': 2},
- 'prop': {'value': 'value', 'mtime': 2},
- }},
- {'commit': [[2, 2]]},
- ]),
- ('stats_diff', {'dst': '127.0.0.1:8888'}, [
- {'db': 'db', 'user': 'user'},
- {'timestamp': ts + 1, 'values': {'field': 2.0}},
- {'commit': {'user': {'db': [[2, ts]]}}},
- ]),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': None, 'filename': '2.sneakernet'}, []),
- ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[2, ts]]}}, 'src': '127.0.0.1:8888', 'dst': None, 'filename': '2.sneakernet'}, []),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({None: [[1, 1]]})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_push_CollectCookies(self):
- request = Request()
- request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \
- base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])]))
- for chunk in sync.package_encode([
- ('pull', {'sequence': [[11, None]]}, None),
- ('pull', {'sequence': [[1, 2]]}, None),
- ('files_pull', {'sequence': [[11, None]]}, None),
- ('files_pull', {'sequence': [[3, 4]]}, None),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, 2], [10, None]]), ('files_pull', None, [[3, 4], [10, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({None: []})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- request = Request()
- request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \
- base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])]))
- for chunk in sync.package_encode([
- ('pull', {'sequence': [[1, 5]]}, None),
- ('files_pull', {'sequence': [[1, 5]]}, None),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, 5], [10, None]]), ('files_pull', None, [[1, 5], [10, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({None: []})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_push_DoNotExcludeAcksFromCookies(self):
- ts = int(time.time())
-
- request = Request()
- for chunk in sync.package_encode([
- ('pull', {'sequence': [[1, None]]}, None),
- ('diff', None, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1},
- 'ctime': {'value': 1, 'mtime': 1},
- 'mtime': {'value': 1, 'mtime': 1},
- 'prop': {'value': 'value', 'mtime': 1},
- }},
- {'commit': [[10, 10]]},
- ]),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[10, 10]], 'dst': None, 'filename': '2.sneakernet'}, []),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({None: [[1, 1]]})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_pull(self):
- self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1})
- self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2})
- self.utime('master', 0)
- self.touch(('sync/1', 'file1'))
- self.touch(('sync/2', 'file2'))
-
- request = Request()
- request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])]))
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
- coroutine.sleep(.5)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0]
- response = Response()
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'diff'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'prop': {'value': '1', 'mtime': 0},
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0}},
- },
- {'guid': '2', 'diff': {
- 'prop': {'value': '2', 'mtime': 0},
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0}},
- },
- {'commit': [[1, 2]]},
- ])
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- request = Request()
- request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0]
- response = Response()
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- packets_iter = sync.decode(gzip.GzipFile(mode='r', fileobj=reply))
- with next(packets_iter) as packet:
- self.assertEqual('files_diff', packet.name)
- records_iter = iter(packet)
- self.assertEqual('file1', next(records_iter)['blob'].read())
- self.assertEqual('file2', next(records_iter)['blob'].read())
- self.assertEqual({'op': 'commit', 'sequence': [[1, 4]]}, next(records_iter))
- self.assertRaises(StopIteration, records_iter.next)
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_pull_EmptyPackets(self):
- self.master._pulls = {
- 'pull': lambda layer, in_seq, out_seq=None, exclude_seq=None, **kwargs: \
- ('diff', None, [{'layer': layer, 'seq': in_seq}]),
- }
-
- request = Request()
- request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]])]))
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
- coroutine.sleep(.5)
- self.assertEqual(1, len([i for i in glob('tmp/pulls/*.tag')]))
-
- request = Request()
- request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0]
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
- self.assertEqual(0, len([i for i in glob('tmp/pulls/*.tag')]))
-
- def test_pull_FullClone(self):
-
- def diff(layer, seq, out_seq):
- out_seq.include(1, 10)
- yield {'layer': layer, 'seq': seq}
-
- self.master._pulls = {
- 'pull': lambda layer, in_seq, out_seq, exclude_seq=None, **kwargs: ('diff', None, diff(layer, in_seq, out_seq)),
- 'files_pull': lambda layer, in_seq, out_seq, exclude_seq=None, **kwargs: ('files_diff', None, diff(layer, in_seq, out_seq)),
- }
-
- request = Request()
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
- coroutine.sleep(.5)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0]
- response = Response()
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'diff'}, [{'layer': None, 'seq': [[1, None]]}]),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- request = Request()
- request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0]
- response = Response()
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'files_diff'}, [{'layer': None, 'seq': [[1, None]]}]),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_push_SetSentCookies(self):
- request = Request()
- for chunk in sync.package_encode([
- ('pull', {'src': '1', 'sequence': [[1, None]], 'layer': '1'}, None),
- ('pull', {'src': '1', 'sequence': [[11, None]], 'layer': '1'}, None),
- ('pull', {'src': '2', 'sequence': [[2, None]], 'layer': '2'}, None),
- ('pull', {'src': '2', 'sequence': [[22, None]], 'layer': '2'}, None),
- ('diff', {'src': '3'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 1},
- 'ctime': {'value': 1, 'mtime': 1},
- 'mtime': {'value': 1, 'mtime': 1},
- 'prop': {'value': 'value', 'mtime': 1},
- }},
- {'commit': [[1, 1]]},
- ]),
- ('diff', {'src': '3'}, [
- {'resource': 'document'},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 2},
- 'ctime': {'value': 2, 'mtime': 2},
- 'mtime': {'value': 2, 'mtime': 2},
- 'prop': {'value': 'value', 'mtime': 2},
- }},
- {'commit': [[2, 2]]},
- ]),
- ], dst='127.0.0.1:8888'):
- request.content_stream.write(chunk)
- request.content_stream.seek(0)
-
- response = Response()
- reply = StringIO()
- for chunk in self.master.push(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': '3', 'filename': '2.sneakernet'}, []),
- ({'packet': 'ack', 'ack': [[2, 2]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': '3', 'filename': '2.sneakernet'}, []),
- ],
- [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)])
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', '1', [[1, None]]), ('pull', '2', [[2, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({'1': [], '2': [], '3': [[1, 2]]})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_pull_ExcludeSentCookies(self):
- self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1})
- self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2})
- self.utime('master', 0)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = {
- 'sugar_network_pull': base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent': base64.b64encode(json.dumps({'slave': [[2, 2]]})),
- }
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({'slave': [[2, 2]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
- coroutine.sleep(.5)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(response.get('set-cookie'))
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'diff'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'prop': {'value': '1', 'mtime': 0},
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0}},
- },
- {'commit': [[1, 1]]},
- ])
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def test_pull_DoNotExcludeSentCookiesForMultipleNodes(self):
- self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1})
- self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2})
- self.utime('master', 0)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = {
- 'sugar_network_pull': base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent': base64.b64encode(json.dumps({'slave': [[2, 2]], 'other': []})),
- }
- response = Response()
- self.assertEqual(None, self.master.pull(request, response))
- self.assertEqual([
- 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
- 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \
- base64.b64encode(json.dumps({'slave': [[2, 2]], 'other': []})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ],
- response.get('set-cookie'))
- coroutine.sleep(.5)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(response.get('set-cookie'))
- reply = StringIO()
- for chunk in self.master.pull(request, response):
- reply.write(chunk)
- reply.seek(0)
- self.assertEqual([
- ({'packet': 'diff'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'prop': {'value': '1', 'mtime': 0},
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0}},
- },
- {'guid': '2', 'diff': {
- 'prop': {'value': '2', 'mtime': 0},
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0}},
- },
- {'commit': [[1, 2]]},
- ])
- ],
- [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
- self.assertEqual([
- 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly',
- 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('set-cookie'))
-
- def __test_pull_LimittedPull(self):
- pass
-
- def __test_pull_ReusePullSeqFromCookies(self):
- pass
-
- def __test_pull_AskForNotYetReadyPull(self):
- pass
-
- def __test_pull_ProcessFilePulls(self):
- pass
-
- def __test_ReuseCachedPulls(self):
- pass
-
-
-class Request(object):
-
- def __init__(self):
- self.content_stream = StringIO()
- self.environ = {}
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/sync_offline.py b/tests/units/node/sync_offline.py
deleted file mode 100755
index 1c4ffd5..0000000
--- a/tests/units/node/sync_offline.py
+++ /dev/null
@@ -1,249 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import time
-import json
-import uuid
-from os.path import exists, join
-
-import rrdtool
-
-from __init__ import tests
-
-from sugar_network import db, node, toolkit
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.client import api_url
-from sugar_network.node import sync, stats_user, files_root
-from sugar_network.node.slave import SlaveRoutes
-from sugar_network.db import Volume
-from sugar_network.toolkit import coroutine
-
-
-class statvfs(object):
-
- f_bfree = None
- f_frsize = 1
-
-
-class SyncOfflineTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
- self.uuid = 0
- self.override(toolkit, 'uuid', self.next_uuid)
- self.override(os, 'statvfs', lambda *args: statvfs())
- statvfs.f_bfree = 999999999
- stats_user.stats_user_step.value = 1
- stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100']
- node.sync_layers.value = 'pilot'
-
- def next_uuid(self):
- self.uuid += 1
- return str(self.uuid)
-
- def test_FailOnFullDump(self):
-
- class Document(db.Resource):
- pass
-
- volume = Volume('node', [Document])
- cp = SlaveRoutes('node/key', volume)
-
- node.sync_layers.value = None
- self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt')
- node.sync_layers.value = 'public'
- cp.offline_sync(tests.tmpdir + '/mnt')
-
- def test_Export(self):
-
- class Document(db.Resource):
- pass
-
- volume = Volume('node', [Document])
- cp = SlaveRoutes('node/key', volume)
- stats_user.stats_user.value = True
-
- volume['document'].create({'guid': '1', 'prop': 'value1', 'ctime': 1, 'mtime': 1})
- volume['document'].create({'guid': '2', 'prop': 'value2', 'ctime': 2, 'mtime': 2})
- self.utime('node', 0)
-
- ts = int(time.time())
- rrd = Rrd('stats/user/dir/user', stats_user.stats_user_step.value, stats_user.stats_user_rras.value)
- rrd['db'].put({'field': 1}, ts)
-
- cp.offline_sync(tests.tmpdir + '/mnt')
- assert cp._offline_session is None
-
- self.assertEqual([
- ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0},
- }},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0},
- }},
- {'commit': [[1, 2]]},
- ]),
- ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [
- {'db': 'db', 'user': 'user'},
- {'timestamp': ts, 'values': {'field': 1.0}},
- {'commit': {'user': {'db': [[1, ts]]}}},
- ]),
- ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet'}, []),
- ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet', 'layer': ['pilot']}, []),
- ],
- sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('mnt')]))
- assert not exists('node/pull.sequence')
- assert not exists('node/push.sequence')
-
- def test_ContinuesExport(self):
- payload = ''.join([str(uuid.uuid4()) for i in xrange(5000)])
-
- class Document(db.Resource):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = Volume('node', [Document])
- cp = SlaveRoutes('node/key', volume)
- stats_user.stats_user.value = True
-
- volume['document'].create({'guid': '1', 'prop': payload, 'ctime': 1, 'mtime': 1})
- volume['document'].create({'guid': '2', 'prop': payload, 'ctime': 2, 'mtime': 2})
- self.utime('node', 0)
-
- ts = int(time.time())
- rrd = Rrd('stats/user/dir/user', stats_user.stats_user_step.value, stats_user.stats_user_rras.value)
- rrd['db'].put({'field': 1}, ts)
-
- statvfs.f_bfree = len(payload) * 1.5 + sync._SNEAKERNET_RESERVED_SIZE
- cp.offline_sync(tests.tmpdir + '/1')
- assert cp._offline_session is not None
-
- self.assertEqual([
- ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0},
- 'prop': {'value': payload, 'mtime': 0},
- }},
- {'commit': [[1, 1]]},
- ]),
- ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet'}, []),
- ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet', 'layer': ['pilot']}, []),
- ],
- sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('1')]))
-
- statvfs.f_bfree = 999999999
- cp.offline_sync(tests.tmpdir + '/2')
- assert cp._offline_session is None
-
- self.assertEqual([
- ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '3.sneakernet'}, [
- {'resource': 'document'},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0},
- 'prop': {'value': payload, 'mtime': 0},
- }},
- {'commit': [[2, 2]]},
- ]),
- ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '3.sneakernet'}, [
- {'db': 'db', 'user': 'user'},
- {'timestamp': ts, 'values': {'field': 1.0}},
- {'commit': {'user': {'db': [[1, ts]]}}},
- ]),
- ],
- sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('2')]))
-
- statvfs.f_bfree = 999999999
- cp.offline_sync(tests.tmpdir + '/3')
- assert cp._offline_session is None
-
- self.assertEqual([
- ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'filename': '5.sneakernet'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0},
- 'prop': {'value': payload, 'mtime': 0},
- }},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0},
- 'prop': {'value': payload, 'mtime': 0},
- }},
- {'commit': [[1, 2]]},
- ]),
- ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'filename': '5.sneakernet'}, [
- {'db': 'db', 'user': 'user'},
- {'timestamp': ts, 'values': {'field': 1.0}},
- {'commit': {'user': {'db': [[1, ts]]}}},
- ]),
- ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'sequence': [[1, None]], 'filename': '5.sneakernet'}, []),
- ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'sequence': [[1, None]], 'filename': '5.sneakernet', 'layer': ['pilot']}, []),
- ],
- sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('3')]))
-
- def test_Import(self):
- class Document(db.Resource):
- pass
-
- volume = Volume('node', [Document])
- cp = SlaveRoutes('node/key', volume)
- stats_user.stats_user.value = True
- files_root.value = 'files'
-
- ts = int(time.time())
- sync.sneakernet_encode([
- ('diff', {'src': '127.0.0.1:8888'}, [
- {'resource': 'document'},
- {'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': 0},
- 'ctime': {'value': 1, 'mtime': 0},
- 'mtime': {'value': 1, 'mtime': 0},
- }},
- {'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': 0},
- 'ctime': {'value': 2, 'mtime': 0},
- 'mtime': {'value': 2, 'mtime': 0},
- }},
- {'commit': [[1, 2]]},
- ]),
- ('files_diff', {'src': '127.0.0.1:8888'}, [
- {'op': 'update', 'blob_size': 1, 'blob': ['a'], 'path': '1'},
- {'op': 'update', 'blob_size': 2, 'blob': ['bb'], 'path': '2'},
- {'op': 'commit', 'sequence': [[1, 2]]},
- ]),
- ('ack', {'ack': [[101, 103]], 'sequence': [[1, 3]], 'src': '127.0.0.1:8888', 'dst': cp.guid}, []),
- ('stats_ack', {'sequence': {'user': {'db': [[1, ts]]}}, 'src': '127.0.0.1:8888', 'dst': cp.guid}, []),
- ],
- root='mnt')
-
- cp.offline_sync(tests.tmpdir + '/mnt')
- assert cp._offline_session is None
-
- self.assertEqual(
- ['1', '2'],
- [i.guid for i in volume['document'].find()[0]])
- self.assertEqual('a', file('files/1').read())
- self.assertEqual('bb', file('files/2').read())
- self.assertEqual([[4, None]], json.load(file('node/push.sequence')))
- self.assertEqual([[3, 100], [104, None]], json.load(file('node/pull.sequence')))
- self.assertEqual([[3, None]], json.load(file('node/files.sequence')))
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/sync_online.py b/tests/units/node/sync_online.py
deleted file mode 100755
index e2c864a..0000000
--- a/tests/units/node/sync_online.py
+++ /dev/null
@@ -1,272 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import json
-from os.path import exists
-
-from __init__ import tests
-
-from sugar_network import db, toolkit
-from sugar_network.client import Connection, api_url, keyfile
-from sugar_network.node import sync, stats_user, files_root
-from sugar_network.node.master import MasterRoutes
-from sugar_network.node.slave import SlaveRoutes
-from sugar_network.db.volume import Volume
-from sugar_network.model.user import User
-from sugar_network.toolkit.router import Router
-from sugar_network.toolkit import coroutine, http
-
-
-class SyncOnlineTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
-
- self.stats_commit = []
- self.stats_merge = []
- def stats_diff():
- yield {'stats': 'probe'}
- self.override(stats_user, 'diff', stats_diff)
- def stats_merge(packet):
- self.stats_merge.extend([i for i in packet])
- return 'ok'
- self.override(stats_user, 'merge', stats_merge)
- self.override(stats_user, 'commit', lambda seq: self.stats_commit.append(seq))
-
- class Document(db.Resource):
-
- @db.indexed_property(prefix='C')
- def context(self, value):
- return value
-
- @db.indexed_property(prefix='T')
- def type(self, value):
- return value
-
- @db.indexed_property(db.Localized, slot=1, prefix='N', full_text=True)
- def title(self, value):
- return value
-
- @db.indexed_property(db.Localized, prefix='D', full_text=True)
- def message(self, value):
- return value
-
- api_url.value = 'http://127.0.0.1:9000'
-
- files_root.value = 'master/files'
- self.master_volume = Volume('master', [User, Document])
- self.master_server = coroutine.WSGIServer(('127.0.0.1', 9000), Router(MasterRoutes('127.0.0.1:9000', self.master_volume)))
- coroutine.spawn(self.master_server.serve_forever)
- coroutine.dispatch()
-
- files_root.value = 'slave/files'
- self.slave_volume = Volume('slave', [User, Document])
- self.slave_server = coroutine.WSGIServer(('127.0.0.1', 9001), Router(SlaveRoutes('slave/node', self.slave_volume)))
- coroutine.spawn(self.slave_server.serve_forever)
- coroutine.dispatch()
-
- def tearDown(self):
- self.master_server.stop()
- self.slave_server.stop()
- tests.Test.tearDown(self)
-
- def test_Push(self):
- client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value))
-
- # Sync users
- client.get(cmd='logon')
- client.post(cmd='online-sync')
- self.assertEqual([[4, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- guid1 = client.post(['document'], {'context': '', 'message': '1', 'title': '', 'type': 'post'})
- guid2 = client.post(['document'], {'context': '', 'message': '2', 'title': '', 'type': 'post'})
-
- client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}},
- {'guid': guid2, 'message': {'en-us': '2'}},
- ],
- [i.properties(['guid', 'message']) for i in self.master_volume['document'].find()[0]])
- self.assertEqual([[6, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[4, None]], json.load(file('slave/push.sequence')))
-
- guid3 = client.post(['document'], {'context': '', 'message': '3', 'title': '', 'type': 'post'})
- client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}},
- {'guid': guid2, 'message': {'en-us': '2'}},
- {'guid': guid3, 'message': {'en-us': '3'}},
- ],
- [i.properties(['guid', 'message']) for i in self.master_volume['document'].find()[0]])
- self.assertEqual([[7, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[5, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.put(['document', guid2], {'message': '22'})
- client.post(cmd='online-sync')
- self.assertEqual(
- {'guid': guid2, 'message': {'en-us': '22'}},
- self.master_volume['document'].get(guid2).properties(['guid', 'message']))
- self.assertEqual([[8, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[6, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.delete(['document', guid1])
- client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}, 'layer': ['deleted']},
- {'guid': guid2, 'message': {'en-us': '22'}, 'layer': []},
- {'guid': guid3, 'message': {'en-us': '3'}, 'layer': []},
- ],
- [i.properties(['guid', 'message', 'layer']) for i in self.master_volume['document'].find()[0]])
- self.assertEqual([[9, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[7, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.put(['document', guid1], {'message': 'a'})
- client.put(['document', guid2], {'message': 'b'})
- client.put(['document', guid3], {'message': 'c'})
- guid4 = client.post(['document'], {'context': '', 'message': 'd', 'title': '', 'type': 'post'})
- client.delete(['document', guid2])
- client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': 'a'}, 'layer': ['deleted']},
- {'guid': guid2, 'message': {'en-us': 'b'}, 'layer': ['deleted']},
- {'guid': guid3, 'message': {'en-us': 'c'}, 'layer': []},
- {'guid': guid4, 'message': {'en-us': 'd'}, 'layer': []},
- ],
- [i.properties(['guid', 'message', 'layer']) for i in self.master_volume['document'].find()[0]])
- self.assertEqual([[13, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[12, None]], json.load(file('slave/push.sequence')))
-
- def test_PushStats(self):
- stats_user.stats_user.value = True
- client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value))
- client.post(cmd='online-sync')
- self.assertEqual(['ok'], self.stats_commit)
- self.assertEqual([{'stats': 'probe'}], self.stats_merge)
-
- def test_Pull(self):
- client = Connection('http://127.0.0.1:9000', auth=http.SugarAuth(keyfile.value))
- slave_client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value))
-
- # Sync users
- slave_client.get(cmd='logon')
- slave_client.post(cmd='online-sync')
- self.assertEqual([[4, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- guid1 = client.post(['document'], {'context': '', 'message': '1', 'title': '', 'type': 'post'})
- guid2 = client.post(['document'], {'context': '', 'message': '2', 'title': '', 'type': 'post'})
-
- slave_client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}},
- {'guid': guid2, 'message': {'en-us': '2'}},
- ],
- [i.properties(['guid', 'message']) for i in self.slave_volume['document'].find()[0]])
- self.assertEqual([[6, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- guid3 = client.post(['document'], {'context': '', 'message': '3', 'title': '', 'type': 'post'})
- slave_client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}},
- {'guid': guid2, 'message': {'en-us': '2'}},
- {'guid': guid3, 'message': {'en-us': '3'}},
- ],
- [i.properties(['guid', 'message']) for i in self.slave_volume['document'].find()[0]])
- self.assertEqual([[7, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.put(['document', guid2], {'message': '22'})
- slave_client.post(cmd='online-sync')
- self.assertEqual(
- {'guid': guid2, 'message': {'en-us': '22'}},
- self.slave_volume['document'].get(guid2).properties(['guid', 'message']))
- self.assertEqual([[8, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.delete(['document', guid1])
- slave_client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': '1'}, 'layer': ['deleted']},
- {'guid': guid2, 'message': {'en-us': '22'}, 'layer': []},
- {'guid': guid3, 'message': {'en-us': '3'}, 'layer': []},
- ],
- [i.properties(['guid', 'message', 'layer']) for i in self.slave_volume['document'].find()[0]])
- self.assertEqual([[9, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- coroutine.sleep(1)
- client.put(['document', guid1], {'message': 'a'})
- client.put(['document', guid2], {'message': 'b'})
- client.put(['document', guid3], {'message': 'c'})
- guid4 = client.post(['document'], {'context': '', 'message': 'd', 'title': '', 'type': 'post'})
- client.delete(['document', guid2])
- slave_client.post(cmd='online-sync')
- self.assertEqual([
- {'guid': guid1, 'message': {'en-us': 'a'}, 'layer': ['deleted']},
- {'guid': guid2, 'message': {'en-us': 'b'}, 'layer': ['deleted']},
- {'guid': guid3, 'message': {'en-us': 'c'}, 'layer': []},
- {'guid': guid4, 'message': {'en-us': 'd'}, 'layer': []},
- ],
- [i.properties(['guid', 'message', 'layer']) for i in self.slave_volume['document'].find()[0]])
- self.assertEqual([[14, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- def test_PullFiles(self):
- self.touch(('master/files/1', 'a', 1))
- self.touch(('master/files/2/2', 'bb', 2))
- self.touch(('master/files/3/3/3', 'ccc', 3))
- os.utime('master/files', (1, 1))
-
- client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value))
- client.post(cmd='online-sync')
-
- files, stamp = json.load(file('master/files.index'))
- self.assertEqual(1, stamp)
- self.assertEqual(sorted([
- [2, '1', 1],
- [3, '2/2', 2],
- [4, '3/3/3', 3],
- ]),
- sorted(files))
-
- self.assertEqual([[5, None]], json.load(file('slave/files.sequence')))
- self.assertEqual('a', file('slave/files/1').read())
- self.assertEqual('bb', file('slave/files/2/2').read())
- self.assertEqual('ccc', file('slave/files/3/3/3').read())
-
- def test_PullFromPreviouslyMergedRecord(self):
- master = Connection('http://127.0.0.1:9000', auth=http.SugarAuth(keyfile.value))
- slave = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value))
-
- # Sync users
- slave.get(cmd='logon')
- slave.post(cmd='online-sync')
- self.assertEqual([[4, None]], json.load(file('slave/pull.sequence')))
- self.assertEqual([[2, None]], json.load(file('slave/push.sequence')))
-
- guid = slave.post(['document'], {'context': '', 'message': '1', 'title': '1', 'type': 'post'})
- slave.post(cmd='online-sync')
-
- coroutine.sleep(1)
- master.put(['document', guid], {'message': '1_'})
- slave.put(['document', guid], {'title': '1_'})
- slave.post(cmd='online-sync')
-
- self.assertEqual(
- {'message': {'en-us': '1_'}, 'title': {'en-us': '1_'}},
- self.master_volume['document'].get(guid).properties(['message', 'title']))
- self.assertEqual(
- {'message': {'en-us': '1_'}, 'title': {'en-us': '1_'}},
- self.slave_volume['document'].get(guid).properties(['message', 'title']))
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py
index 89702a3..ee726bd 100644
--- a/tests/units/toolkit/__main__.py
+++ b/tests/units/toolkit/__main__.py
@@ -6,7 +6,6 @@ from coroutine import *
from http import *
from lsb_release import *
from mountpoints import *
-from rrd import *
from toolkit import *
from options import *
from spec import *
diff --git a/tests/units/toolkit/rrd.py b/tests/units/toolkit/rrd.py
deleted file mode 100755
index 3434634..0000000
--- a/tests/units/toolkit/rrd.py
+++ /dev/null
@@ -1,291 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import time
-
-import rrdtool
-
-from __init__ import tests
-
-from sugar_network.toolkit import rrd
-
-
-class RrdTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
- rrd._FETCH_PAGE = 100
- rrd._rrdtool = rrdtool
-
- def test_Db(self):
- ts = int(time.time()) + 100
-
- rrdtool.create('test.rrd',
- '--start', str(ts),
- '-s', '1',
- 'DS:f1:GAUGE:1:2:3',
- 'DS:f2:COUNTER:4:5:6',
- 'RRA:AVERAGE:0.1:7:8',
- 'RRA:LAST:0.2:9:10',
- )
-
- db = rrd._Db('test.rrd')
- self.assertEqual(1, db.step)
- self.assertEqual(ts, db.last)
- self.assertEqual(2, len(db.fields))
- self.assertEqual(2, len(db.rras))
-
- self.assertEqual('f1', db.fields[0]['name'])
- self.assertEqual('GAUGE', db.fields[0]['type'])
- self.assertEqual(1, db.fields[0]['minimal_heartbeat'])
- self.assertEqual(2, db.fields[0]['min'])
- self.assertEqual(3, db.fields[0]['max'])
-
- self.assertEqual('f2', db.fields[1]['name'])
- self.assertEqual('COUNTER', db.fields[1]['type'])
- self.assertEqual(4, db.fields[1]['minimal_heartbeat'])
- self.assertEqual(5, db.fields[1]['min'])
- self.assertEqual(6, db.fields[1]['max'])
-
- self.assertEqual('RRA:AVERAGE:0.1:7:8', db.rras[0])
- self.assertEqual('RRA:LAST:0.2:9:10', db.rras[1])
-
- def test_DbSet_load(self):
- rrdtool.create('1.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8')
- rrdtool.create('2.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8')
- rrdtool.create('3.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8')
-
- dbset = rrd._DbSet('.', None, None, None)
- dbset.load('1.rrd', 1)
- self.assertEqual(
- ['./1.rrd'],
- [i.path for i in dbset._revisions])
- dbset.load('2.rrd' ,2)
- self.assertEqual(
- ['./1.rrd', './2.rrd'],
- [i.path for i in dbset._revisions])
- dbset.load('3.rrd', 3)
- self.assertEqual(
- ['./1.rrd', './2.rrd', './3.rrd'],
- [i.path for i in dbset._revisions])
-
- dbset = rrd._DbSet('.', None, None, None)
- dbset.load('3.rrd', 3)
- self.assertEqual(
- ['./3.rrd'],
- [i.path for i in dbset._revisions])
- dbset.load('2.rrd', 2)
- self.assertEqual(
- ['./2.rrd', './3.rrd'],
- [i.path for i in dbset._revisions])
- dbset.load('1.rrd', 1)
- self.assertEqual(
- ['./1.rrd', './2.rrd', './3.rrd'],
- [i.path for i in dbset._revisions])
-
- def test_DbSet_put_ToNewDbAndSkipOlds(self):
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
-
- ts = int(time.time())
- dbset.put({'f1': 1, 'f2': 1}, ts)
- __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3))
- self.assertEqual('f1', f1)
- self.assertEqual('f2', f2)
- assert (1, 1) in values
-
- dbset.put({'f1': 2, 'f2': 2}, ts)
- ts = int(time.time())
- __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3))
- assert (2, 2) not in values
-
- dbset.put({'f1': 3, 'f2': 3}, ts + 1)
- ts = int(time.time())
- __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3))
- assert (3, 3) in values
-
- def test_DbSet_put_WithChangedLayout(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1}, ts)
- self.assertEqual('./test.rrd', dbset._get_db(0).path)
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- dbset.put({'f1': 2, 'f2': 2}, ts)
- assert dbset._get_db(0) is None
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- dbset.put({'f1': 2, 'f2': 2}, ts + 1)
- self.assertEqual('./test-1.rrd', dbset._get_db(0).path)
-
- __, __, values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 10))
- assert (1,) in values
- assert (2, 2) not in values
-
- __, __, values = rrdtool.fetch('test-1.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 10))
- assert (1,) not in values
- assert (2, 2) in values
-
- def test_DbSet_put_WithChangedRRA(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1}, ts)
- self.assertEqual('./test.rrd', dbset._get_db(0).path)
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.1:1:10'])
- dbset.load('test.rrd', 0)
- dbset.put({'f1': 1}, ts + 1)
- self.assertEqual('./test-1.rrd', dbset._get_db(0).path)
-
- def test_DbSet_put_WithChangedStep(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1}, ts)
- self.assertEqual('./test.rrd', dbset._get_db(0).path)
-
- dbset = rrd._DbSet('.', 'test', 2, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- dbset.put({'f1': 1}, ts + 2)
- self.assertEqual('./test-1.rrd', dbset._get_db(0).path)
-
- def test_DbSet_get_OneDb(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1, 'f2': 1}, ts)
- dbset.put({'f1': 2, 'f2': 2}, ts + 1)
- dbset.put({'f1': 3, 'f2': 3}, ts + 2)
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0})],
- [(t, i) for t, i in dbset.get(ts, ts)])
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 1, {'f1': 2.0, 'f2': 2.0})],
- [(t, i) for t, i in dbset.get(ts, ts + 1)])
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 1, {'f1': 2.0, 'f2': 2.0}), (ts + 2, {'f1': 3.0, 'f2': 3.0})],
- [(t, i) for t, i in dbset.get(ts, ts + 2)])
-
- def test_DbSet_get_OneDbLongSteps(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 3, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1, 'f2': 1}, ts)
- dbset.put({'f1': 2, 'f2': 2}, ts + 3)
- dbset.put({'f1': 3, 'f2': 3}, ts + 6)
-
- ts = ts / 3 * 3
-
- dbset = rrd._DbSet('.', 'test', 3, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0})],
- [(t, i) for t, i in dbset.get(ts, ts)])
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 3, {'f1': 2.0, 'f2': 2.0})],
- [(t, i) for t, i in dbset.get(ts, ts + 3)])
- self.assertEqual(
- [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 3, {'f1': 2.0, 'f2': 2.0}), (ts + 6, {'f1': 3.0, 'f2': 3.0})],
- [(t, i) for t, i in dbset.get(ts, ts + 6)])
-
- def test_DbSet_get_MultipeDbs(self):
- ts = int(time.time())
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.put({'f1': 1, 'f2': 1}, ts)
- ts = dbset._get_db(0).last
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.6:1:10'])
- dbset.load('test.rrd', 0)
- dbset.put({'f1': 2, 'f2': 2}, ts + 1)
- dbset.put({'f1': 3, 'f2': 3}, ts + 2)
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.7:1:10'])
- dbset.load('test.rrd', 0)
- dbset.load('test-1.rrd', 1)
- dbset.put({'f1': 4, 'f2': 4}, ts + 3)
- dbset.put({'f1': 5, 'f2': 5}, ts + 4)
- dbset.put({'f1': 6, 'f2': 6}, ts + 5)
-
- dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10'])
- dbset.load('test.rrd', 0)
- dbset.load('test-1.rrd', 1)
- dbset.load('test-2.rrd', 2)
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts)])
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- (ts + 1, {'f1': 2.0, 'f2': 2.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts + 1)])
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- (ts + 1, {'f1': 2.0, 'f2': 2.0}),
- (ts + 2, {'f1': 3.0, 'f2': 3.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts + 2)])
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- (ts + 1, {'f1': 2.0, 'f2': 2.0}),
- (ts + 2, {'f1': 3.0, 'f2': 3.0}),
- (ts + 3, {'f1': 4.0, 'f2': 4.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts + 3)])
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- (ts + 1, {'f1': 2.0, 'f2': 2.0}),
- (ts + 2, {'f1': 3.0, 'f2': 3.0}),
- (ts + 3, {'f1': 4.0, 'f2': 4.0}),
- (ts + 4, {'f1': 5.0, 'f2': 5.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts + 4)])
- self.assertEqual(
- [
- (ts, {'f1': 1.0, 'f2': 1.0}),
- (ts + 1, {'f1': 2.0, 'f2': 2.0}),
- (ts + 2, {'f1': 3.0, 'f2': 3.0}),
- (ts + 3, {'f1': 4.0, 'f2': 4.0}),
- (ts + 4, {'f1': 5.0, 'f2': 5.0}),
- (ts + 5, {'f1': 6.0, 'f2': 6.0}),
- ],
- [(t, i) for t, i in dbset.get(ts, ts + 5)])
-
- def test_NoTimestampDupes(self):
- start_ts = int(time.time())
- end_ts = start_ts + 86400 * 3
-
- dbset = rrd._DbSet('.', 'test', 300, [
- 'RRA:AVERAGE:0.5:1:288',
- 'RRA:AVERAGE:0.5:3:672',
- 'RRA:AVERAGE:0.5:12:744',
- 'RRA:AVERAGE:0.5:144:732',
- ])
- for i in xrange((end_ts - start_ts) / 300):
- dbset.put({'f': i}, start_ts + i * 300)
-
- prev_ts = -1
- prev_value = -1
- for ts, value in dbset.get(start_ts, end_ts, 86400):
- value = value['f']
- assert ts > prev_ts
- assert value > prev_value
- prev_ts = ts
- prev_value = value
-
-
-if __name__ == '__main__':
- tests.main()