diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-07 17:02:19 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-09 05:15:13 (GMT) |
commit | 41cb9e831db3d66973292fbdb4c13fb658ac9f59 (patch) | |
tree | dec6b37654de3f0d6d12dca6c413b2890cd8ef9f | |
parent | 90f74541ec4925bad47466e39517c22ff7eadfe4 (diff) |
Fix node synchronization; remove usage stats, it should be standalone project/process
42 files changed, 1513 insertions, 3952 deletions
diff --git a/sugar-network b/sugar-network index 63e1078..04caa59 100755 --- a/sugar-network +++ b/sugar-network @@ -78,7 +78,7 @@ class ClientRouter(Router, ClientRoutes): home = db.Volume(client.path('db'), RESOURCES) Router.__init__(self, self) ClientRoutes.__init__(self, home, - client.api_url.value if not offline.value else None, + client.api.value if not offline.value else None, no_subscription=True) if not offline.value: for __ in self.subscribe(event='inline', state='online'): @@ -139,7 +139,7 @@ class Application(application.Application): if self.check_for_instance(): conn = IPCConnection() else: - conn = Connection(client.api_url.value) + conn = Connection(client.api.value) guid = conn.upload(['release'], path, cmd='submit', **props) if porcelain.value: diff --git a/sugar-network-client b/sugar-network-client index 87bf65e..3891c99 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -43,7 +43,6 @@ class Application(application.Daemon): def __init__(self, **kwargs): application.Daemon.__init__(self, **kwargs) - node.sync_layers.value = client.layers.value self.jobs = coroutine.Pool() new_root = (client.local_root.value != client.local_root.default) @@ -102,7 +101,7 @@ class Application(application.Daemon): def run(self): volume = db.Volume(client.path('db'), RESOURCES) routes = CachedClientRoutes(volume, - client.api_url.value if not client.server_mode.value else None) + client.api.value if not client.server_mode.value else None) router = Router(routes, allow_spawn=True) logging.info('Listening for IPC requests on %s port', @@ -203,7 +202,7 @@ Option.seek('main', [toolkit.cachedir]) Option.seek('webui', webui) Option.seek('client', client) Option.seek('db', db) -Option.seek('node', [node.port, node.find_limit, node.sync_layers]) +Option.seek('node', [node.port, node.find_limit]) Option.seek('user-stats', stats_user) app = Application( diff --git a/sugar-network-node b/sugar-network-node index 6721337..f35e8ad 100755 --- a/sugar-network-node +++ b/sugar-network-node @@ -19,16 +19,14 @@ import os import locale import gettext import logging -from os.path import exists, join +from os.path import exists from gevent import monkey -from sugar_network import db, node, client, toolkit, model -from sugar_network.node import stats_user, obs, master, slave -from sugar_network.node import model as master_model -from sugar_network.node.routes import generate_node_stats +from sugar_network import db, node, toolkit, model +from sugar_network.node import obs, master, slave from sugar_network.toolkit.http import Connection -from sugar_network.toolkit.router import Router, Request, Response +from sugar_network.toolkit.router import Router from sugar_network.toolkit import coroutine, application, Option, enforce @@ -55,20 +53,16 @@ class Application(application.Daemon): if node.certfile.value: ssl_args['certfile'] = node.certfile.value - master_path = join(node.data_root.value, 'master') - if exists(master_path): - with file(master_path) as f: - node_key = f.read().strip() + if node.master_mode.value: node_class = master.MasterRoutes - resources = master_model.RESOURCES - logging.info('Start %s node in master mode', node_key) + resources = master.RESOURCES + logging.info('Start master node') else: - node_key = join(node.data_root.value, 'node.key') node_class = slave.SlaveRoutes resources = model.RESOURCES logging.info('Start slave node') volume = db.Volume(node.data_root.value, resources) - cp = node_class(node_key, volume, find_limit=node.find_limit.value) + cp = node_class(volume=volume, find_limit=node.find_limit.value) self.jobs.spawn(volume.populate) logging.info('Listening for requests on %s:%s', @@ -81,7 +75,6 @@ class Application(application.Daemon): try: self.jobs.join() finally: - cp.close() volume.close() def shutdown(self): @@ -102,14 +95,6 @@ class Application(application.Daemon): path = self.args.pop(0) self._ensure_instance().post(cmd='offline-sync', path=path) - @application.command( - 're-generate node statistics', name='restat') - def restat(self): - enforce(not self.check_for_instance(), 'Shutdown the server at first') - volume = db.Volume(node.data_root.value, model.RESOURCES) - volume.populate() - generate_node_stats(volume, join(node.stats_root.value, 'node')) - def _ensure_instance(self): enforce(self.check_for_instance(), 'Node is not started') return Connection('http://localhost:%s' % @@ -126,9 +111,7 @@ locale.setlocale(locale.LC_ALL, '') Option.seek('main', application) Option.seek('main', [toolkit.cachedir]) -Option.seek('client', [client.api_url]) Option.seek('node', node) -Option.seek('user-stats', stats_user) Option.seek('obs', obs) Option.seek('db', db) diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index cd795c6..52bc324 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -18,7 +18,7 @@ import logging import hashlib import mimetypes from contextlib import contextmanager -from os.path import exists, abspath, join, dirname +from os.path import exists, abspath, join, dirname, isdir from sugar_network import toolkit from sugar_network.toolkit.router import File @@ -104,10 +104,16 @@ class Blobs(object): path = self.path(digest) if exists(path + _META_SUFFIX): return File(path, digest, _read_meta(path)) + elif isdir(path): + return _lsdir(path, digest) def delete(self, path): self._delete(path, None) + def populate(self, path=None, recursive=True): + for __ in self.diff([[1, None]], path or '', recursive): + pass + def diff(self, r, path=None, recursive=True): if path is None: is_files = False @@ -228,3 +234,10 @@ def _read_meta(path): key, value = line.split(':', 1) meta[key] = value.strip() return meta + + +def _lsdir(root, rel_root): + for filename in os.listdir(root): + path = join(root, filename) + if exists(path + _META_SUFFIX): + yield File(path, join(rel_root, filename), _read_meta(path)) diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 71a3efd..9d94929 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -123,7 +123,6 @@ class Resource(object): def diff(self, r): patch = {} - last_seqno = None for name, prop in self.metadata.items(): if name == 'seqno' or prop.acl & ACL.CALC: continue @@ -133,16 +132,16 @@ class Resource(object): seqno = meta.get('seqno') if not ranges.contains(r, seqno): continue - last_seqno = max(seqno, last_seqno) value = meta.get('value') if isinstance(prop, Aggregated): value_ = {} for key, agg in value.items(): - if ranges.contains(r, agg.pop('seqno')): + agg_seqno = agg.pop('seqno') + if ranges.contains(r, agg_seqno): value_[key] = agg value = value_ patch[name] = {'mtime': meta['mtime'], 'value': value} - return last_seqno, patch + return patch def format_patch(self, props): if not props: diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 5ec5683..5d9bac1 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -76,8 +76,15 @@ class Volume(dict): for __ in cls.populate(): coroutine.dispatch() - def diff(self, r, files=None, one_way=False): + def diff(self, r, exclude=None, files=None, one_way=False): + if exclude: + include = deepcopy(r) + ranges.exclude(include, exclude) + else: + include = r last_seqno = None + found = False + try: for resource, directory in self.items(): if one_way and directory.resource.one_way: @@ -90,33 +97,34 @@ class Volume(dict): query += str(end) docs, __ = directory.find(query=query, order_by='seqno') for doc in docs: - seqno, patch = doc.diff(r) - if not patch: - continue - yield {'guid': doc.guid, 'patch': patch} - last_seqno = max(last_seqno, seqno) - for blob in self.blobs.diff(r): + patch = doc.diff(include) + if patch: + yield {'guid': doc.guid, 'patch': patch} + found = True + last_seqno = max(last_seqno, doc['seqno']) + for blob in self.blobs.diff(include): seqno = int(blob.pop('x-seqno')) yield blob + found = True last_seqno = max(last_seqno, seqno) for dirpath in files or []: - for blob in self.blobs.diff(r, dirpath): + for blob in self.blobs.diff(include, dirpath): seqno = int(blob.pop('x-seqno')) yield blob + found = True last_seqno = max(last_seqno, seqno) except StopIteration: pass - if last_seqno: - commit_r = deepcopy(r) + if found: + commit_r = include if exclude else deepcopy(r) ranges.exclude(commit_r, last_seqno + 1, None) ranges.exclude(r, None, last_seqno) yield {'commit': commit_r} def patch(self, records): directory = None - commit_r = [] - merged_r = [] + committed = [] seqno = None for record in records: @@ -137,12 +145,10 @@ class Volume(dict): commit = record.get('commit') if commit is not None: - ranges.include(commit_r, commit) + ranges.include(committed, commit) continue - if seqno is not None: - ranges.include(merged_r, seqno, seqno) - return commit_r, merged_r + return seqno, committed def __enter__(self): return self diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index a4360b4..14d675c 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -34,28 +34,17 @@ certfile = Option( name='certfile') data_root = Option( - 'path to a directory to place server data', + 'path to a directory to place node data', default='/var/lib/sugar-network', name='data_root') find_limit = Option( 'limit the resulting list for search requests', default=64, type_cast=int, name='find-limit') -stats_root = Option( - 'path to the root directory for placing stats', - default='/var/lib/sugar-network/stats', name='stats_root') +mode = Option( + 'node running mode, should be one of "slave", "proxy", or, "master"', + default='slave') -files_root = Option( - 'path to a directory to keep files synchronized between nodes', - default='/var/lib/sugar-network/files', name='files_root') - -pull_timeout = Option( - 'delay in seconds to return to sync-pull requester to wait until ' - 'pull request will be ready', - default=30, type_cast=int) - -sync_layers = Option( - 'comma separated list of layers to restrict Sugar Network ' - 'synchronization content', - default=[], type_cast=Option.list_cast, - type_repr=Option.list_repr, name='sync-layers') +master_api = Option( + 'master API url either to connect to (for slave or proxy nodes), or,' + 'to provide from (for master nodes)') diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py deleted file mode 100644 index b7156bc..0000000 --- a/sugar_network/node/downloads.py +++ /dev/null @@ -1,128 +0,0 @@ -# Copyright (C) 2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -"""Persistent pool with temporary files prepared to download.""" - -import os -import json -import hashlib -import logging -from glob import glob -from os.path import join, splitext, exists - -from sugar_network.toolkit import pylru, coroutine, exception - - -# Maximum numer of postponed pulls master can handle at the same time -_POOL_SIZE = 256 -_TAG_SUFFIX = '.tag' - -_logger = logging.getLogger('node.downloads') - - -class Pool(object): - - def __init__(self, root): - self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop()) - if not exists(root): - os.makedirs(root) - self._root = root - - for tag_path in glob(join(root, '*.tag')): - path, __ = splitext(tag_path) - if exists(path): - try: - with file(tag_path) as f: - key, tag = json.load(f) - pool_key = json.dumps(key) - self._pool[pool_key] = _Download(key, tag, path) - continue - except Exception: - exception('Cannot open %r download, recreate', tag_path) - os.unlink(path) - os.unlink(tag_path) - - def get(self, key): - key = json.dumps(key) - if key in self._pool: - return self._pool[key] - - def set(self, key, tag, fetcher, *args, **kwargs): - pool_key = json.dumps(key) - path = join(self._root, hashlib.md5(pool_key).hexdigest()) - - def do_fetch(): - try: - complete = fetcher(*args, path=path, **kwargs) - except Exception: - exception('Error while fetching %r', self) - if exists(path): - os.unlink(path) - return True - with file(path + _TAG_SUFFIX, 'w') as f: - json.dump([key, tag], f) - return complete - - job = coroutine.spawn(do_fetch) - dl = self._pool[pool_key] = _Download(key, tag, path, job) - return dl - - def remove(self, key): - key = json.dumps(key) - if key in self._pool: - self._pool.peek(key).pop() - del self._pool[key] - - -class _Download(dict): - - def __init__(self, key, tag, path, job=None): - self.tag = tag - self._key = key - self._path = path - self._job = job - - def __repr__(self): - return '<Download %r path=%r>' % (self._key, self._path) - - @property - def ready(self): - # pylint: disable-msg=E1101 - return self._job is None or self._job.dead - - @property - def complete(self): - return self._job is not None and self._job.value - - @property - def length(self): - if exists(self._path): - return os.stat(self._path).st_size - - def open(self): - if exists(self._path): - return file(self._path, 'rb') - - def pop(self): - if self._job is not None and not self._job.dead: - _logger.debug('Abort fetching %r', self) - self._job.kill() - - if exists(self._path): - os.unlink(self._path) - if exists(self._path + _TAG_SUFFIX): - os.unlink(self._path + _TAG_SUFFIX) - - _logger.debug('Throw out %r from the pool', self) diff --git a/sugar_network/node/files.py b/sugar_network/node/files.py deleted file mode 100644 index 4fb64ca..0000000 --- a/sugar_network/node/files.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import json -import logging -from bisect import bisect_left -from shutil import copyfileobj -from os.path import join, exists, relpath, lexists, dirname - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine - - -_logger = logging.getLogger('node.sync_files') - - -def merge(files_path, packet): - files_path = files_path.rstrip(os.sep) - if not exists(files_path): - os.makedirs(files_path) - commit_seq = None - - for record in packet: - op = record.get('op') - if op == 'update': - path = join(files_path, record['path']) - if not exists(dirname(path)): - os.makedirs(dirname(path)) - with toolkit.new_file(path) as f: - copyfileobj(record['blob'], f) - elif op == 'delete': - path = join(files_path, record['path']) - if lexists(path): - os.unlink(path) - elif op == 'commit': - commit_seq = record['sequence'] - - return commit_seq - - -class Index(object): - - def __init__(self, files_path, index_path, seqno): - self._files_path = files_path.rstrip(os.sep) - self._index_path = index_path - self._seqno = seqno - self._index = [] - self._stamp = 0 - self._mutex = coroutine.Lock() - - if exists(self._index_path): - with file(self._index_path) as f: - self._index, self._stamp = json.load(f) - - if not exists(self._files_path): - os.makedirs(self._files_path) - - def sync(self): - with self._mutex: - return self._sync() - - def diff(self, in_seq, out_seq=None, **kwargs): - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_initial_diff = not out_seq - - # Below calls will trigger coroutine switches, thius, - # avoid changing `self._index` by different coroutines. - with self._mutex: - self._sync() - - _logger.debug('Start sync: in_seq=%r', in_seq) - - files = 0 - deleted = 0 - pos = 0 - - try: - for start, end in in_seq: - pos = bisect_left(self._index, [start, None, None], pos) - for pos, (seqno, path, mtime) in \ - enumerate(self._index[pos:]): - if end is not None and seqno > end: - break - coroutine.dispatch() - if mtime < 0: - yield {'op': 'delete', 'path': path} - deleted += 1 - else: - blob_path = join(self._files_path, path) - yield {'op': 'update', - 'path': path, - 'blob_size': os.stat(blob_path).st_size, - 'blob': toolkit.iter_file(blob_path), - } - out_seq.include(start, seqno) - start = seqno - files += 1 - except StopIteration: - pass - - if is_initial_diff: - # There is only one diff, so, we can stretch it to remove holes - out_seq.stretch() - yield {'op': 'commit', 'sequence': out_seq} - - _logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r', - in_seq, out_seq, files, deleted) - - def _sync(self): - if os.stat(self._files_path).st_mtime <= self._stamp: - return False - - new_files = set() - updates = 0 - deletes = 0 - - # Populate list of new files at first - for root, __, files in os.walk(self._files_path): - coroutine.dispatch() - rel_root = relpath(root, self._files_path) - if rel_root == '.': - rel_root = '' - else: - rel_root += os.sep - for filename in files: - coroutine.dispatch() - path = join(root, filename) - if os.lstat(path).st_mtime > self._stamp: - new_files.add(rel_root + filename) - - # Check for updates for already tracked files - tail = [] - for pos, (__, rel_path, mtime) in enumerate(self._index[:]): - coroutine.dispatch() - path = join(self._files_path, rel_path) - existing = lexists(path) - if existing == (mtime >= 0) and \ - (not existing or os.lstat(path).st_mtime == mtime): - continue - if existing: - new_files.discard(rel_path) - pos -= len(tail) - self._index = self._index[:pos] + self._index[pos + 1:] - tail.append([ - self._seqno.next(), - rel_path, - int(os.lstat(path).st_mtime) if existing else -1, - ]) - if existing: - updates += 1 - else: - deletes += 1 - self._index.extend(tail) - - _logger.debug('Updated %r index: new=%r updates=%r deletes=%r', - self._files_path, len(self._files_path), updates, deletes) - - # Finally, add new files - for rel_path in sorted(new_files): - coroutine.dispatch() - mtime = os.lstat(join(self._files_path, rel_path)).st_mtime - self._index.append([self._seqno.next(), rel_path, mtime]) - - self._stamp = os.stat(self._files_path).st_mtime - if self._seqno.commit(): - with toolkit.new_file(self._index_path) as f: - json.dump((self._index, self._stamp), f) - - return True diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index c7c22e0..61d32fb 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -1,4 +1,4 @@ -# Copyright (C) 2013 Aleksey Lim +# Copyright (C) 2013-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -13,17 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import json -import base64 import logging -from Cookie import SimpleCookie -from os.path import join +from urlparse import urlsplit -from sugar_network import node, toolkit -from sugar_network.node import sync, stats_user, files, model, downloads, obs +from sugar_network import toolkit +from sugar_network.node import obs, master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import http, parcel, pylru, ranges, enforce RESOURCES = ( @@ -33,215 +31,107 @@ RESOURCES = ( 'sugar_network.model.user', ) -_ONE_WAY_DOCUMENTS = ['report'] - _logger = logging.getLogger('node.master') class MasterRoutes(NodeRoutes): - def __init__(self, guid, volume, **kwargs): - NodeRoutes.__init__(self, guid, volume=volume, **kwargs) - - self._pulls = { - 'pull': lambda **kwargs: - ('diff', None, model.diff(self.volume, - ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)), - 'files_pull': lambda **kwargs: - ('files_diff', None, self._files.diff(**kwargs)), - } - - self._pull_queue = downloads.Pool( - join(toolkit.cachedir.value, 'pulls')) - self._files = None - - if node.files_root.value: - self._files = files.Index(node.files_root.value, - join(volume.root, 'files.index'), volume.seqno) - - @route('POST', cmd='sync', - acl=ACL.AUTH) - def sync(self, request): - reply, cookie = self._push(sync.decode(request.content_stream)) - exclude_seq = None - if len(cookie.sent) == 1: - exclude_seq = cookie.sent.values()[0] - for op, layer, seq in cookie: - reply.append(self._pulls[op](in_seq=seq, - exclude_seq=exclude_seq, layer=layer)) - return sync.encode(reply, src=self.guid) + def __init__(self, **kwargs): + NodeRoutes.__init__(self, urlsplit(master_api.value).netloc, **kwargs) + self._pulls = pylru.lrucache(1024) + + @route('POST', cmd='sync', arguments={'accept_length': int}) + def sync(self, accept_length): + return parcel.encode(self._push() + (self._pull() or []), + limit=accept_length, header={'from': self.guid}, + on_complete=this.cookie.clear) @route('POST', cmd='push') - def push(self, request, response): - reply, cookie = self._push(sync.package_decode(request.content_stream)) - # Read passed cookie only after excluding `merged_seq`. - # If there is `pull` out of currently pushed packet, excluding - # `merged_seq` should not affect it. - cookie.update(_Cookie(request)) - cookie.store(response) - return sync.package_encode(reply, src=self.guid) - - @route('GET', cmd='pull', - mime_type='application/octet-stream', - arguments={'accept_length': int}) - def pull(self, request, response, accept_length=None): - cookie = _Cookie(request) - if not cookie: - _logger.warning('Requested full dump in pull command') - cookie.append(('pull', None, toolkit.Sequence([[1, None]]))) - cookie.append(('files_pull', None, toolkit.Sequence([[1, None]]))) - - exclude_seq = None - if len(cookie.sent) == 1: - exclude_seq = toolkit.Sequence(cookie.sent.values()[0]) - - reply = None - for pull_key in cookie: - op, layer, seq = pull_key - - pull = self._pull_queue.get(pull_key) - if pull is not None: - if not pull.ready: - continue - if not pull.tag: - self._pull_queue.remove(pull_key) - cookie.remove(pull_key) - continue - if accept_length is None or pull.length <= accept_length: - _logger.debug('Found ready to use %r', pull) - if pull.complete: - cookie.remove(pull_key) - else: - seq.exclude(pull.tag) - reply = pull.open() - break - _logger.debug('Existing %r is too big, will recreate', pull) - self._pull_queue.remove(pull_key) - - out_seq = toolkit.Sequence() - pull = self._pull_queue.set(pull_key, out_seq, - sync.sneakernet_encode, - [self._pulls[op](in_seq=seq, out_seq=out_seq, - exclude_seq=exclude_seq, layer=layer, - fetch_blobs=True)], - limit=accept_length, src=self.guid) - _logger.debug('Start new %r', pull) + def push(self): + return parcel.encode(self._push(), header={'from': self.guid}) + @route('GET', cmd='pull', arguments={'accept_length': int}) + def pull(self, accept_length): + reply = self._pull() if reply is None: - if cookie: - _logger.debug('No ready pulls') - # TODO Might be useful to set meaningful value here - cookie.delay = node.pull_timeout.value - else: - _logger.debug('Nothing to pull') - - cookie.store(response) - return reply + return None + return parcel.encode(reply, limit=accept_length, + header={'from': self.guid}, on_complete=this.cookie.clear) @route('PUT', ['context', None], cmd='presolve', acl=ACL.AUTH, mime_type='application/json') def presolve(self, request): - enforce(node.files_root.value, http.BadRequest, 'Disabled') - aliases = self.volume['context'].get(request.guid)['aliases'] + aliases = this.volume['context'].get(request.guid)['aliases'] enforce(aliases, http.BadRequest, 'Nothing to presolve') - return obs.presolve(None, aliases, node.files_root.value) + return obs.presolve(None, aliases, this.volume.blobs.path('packages')) def status(self): result = NodeRoutes.status(self) - result['level'] = 'master' + result['mode'] = 'master' return result - def _push(self, stream): + def _push(self): + cookie = this.cookie reply = [] - cookie = _Cookie() - - for packet in stream: - src = packet['src'] - enforce(packet['dst'] == self.guid, 'Misaddressed packet') - - if packet.name == 'pull': - pull_seq = cookie['pull', packet['layer'] or None] - pull_seq.include(packet['sequence']) - cookie.sent.setdefault(src, toolkit.Sequence()) - elif packet.name == 'files_pull': - if self._files is not None: - cookie['files_pull'].include(packet['sequence']) - elif packet.name == 'diff': - seq, ack_seq = model.merge(self.volume, packet) - reply.append(('ack', { - 'ack': ack_seq, - 'sequence': seq, - 'dst': src, - }, None)) - sent_seq = cookie.sent.setdefault(src, toolkit.Sequence()) - sent_seq.include(ack_seq) - elif packet.name == 'stats_diff': - reply.append(('stats_ack', { - 'sequence': stats_user.merge(packet), - 'dst': src, - }, None)) - - return reply, cookie - - -class _Cookie(list): - - def __init__(self, request=None): - list.__init__(self) - - self.sent = {} - self.delay = 0 - - if request is not None: - self.update(self._get_cookie(request, 'sugar_network_pull') or []) - self.sent = self._get_cookie(request, 'sugar_network_sent') or {} - - def __repr__(self): - return '<Cookie pull=%s sent=%r>' % (list.__repr__(self), self.sent) - - def update(self, that): - for op, layer, seq in that: - self[op, layer].include(seq) - - def store(self, response): - response.set('set-cookie', []) - if self: - _logger.debug('Postpone %r in cookie', self) - self._set_cookie(response, 'sugar_network_pull', - base64.b64encode(json.dumps(self))) - self._set_cookie(response, 'sugar_network_sent', - base64.b64encode(json.dumps(self.sent))) - self._set_cookie(response, 'sugar_network_delay', self.delay) + + for packet in parcel.decode( + this.request.content_stream, this.request.content_length): + sender = packet['from'] + enforce(packet['to'] == self.guid, http.BadRequest, + 'Misaddressed packet') + if packet.name == 'push': + seqno, push_r = this.volume.patch(packet) + ack_r = [] if seqno is None else [[seqno, seqno]] + ack = {'ack': ack_r, 'ranges': push_r, 'to': sender} + reply.append(('ack', ack, None)) + cookie.setdefault('ack', {}) \ + .setdefault(sender, []) \ + .append((push_r, ack_r)) + elif packet.name == 'pull': + cookie.setdefault('ack', {}).setdefault(sender, []) + ranges.include(cookie.setdefault('pull', []), packet['ranges']) + elif packet.name == 'request': + cookie.setdefault('request', []).append(packet.header) + + return reply + + def _pull(self): + processed = this.cookie.get('id') + if processed in self._pulls: + cookie = this.cookie = self._pulls[processed] + if not cookie: + return None else: - self._unset_cookie(response, 'sugar_network_pull') - self._unset_cookie(response, 'sugar_network_sent') - self._unset_cookie(response, 'sugar_network_delay') - - def __getitem__(self, key): - if not isinstance(key, tuple): - key = (key, None) - for op, layer, seq in self: - if (op, layer) == key: - return seq - seq = toolkit.Sequence() - self.append(key + (seq,)) - return seq - - def _get_cookie(self, request, name): - cookie_str = request.environ.get('HTTP_COOKIE') - if not cookie_str: - return - cookie = SimpleCookie() - cookie.load(cookie_str) - if name not in cookie: - return - value = cookie.get(name).value - if value != 'unset_%s' % name: - return json.loads(base64.b64decode(value)) - - def _set_cookie(self, response, name, value, age=3600): - cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age) - response.get('set-cookie').append(cookie) - - def _unset_cookie(self, response, name): - self._set_cookie(response, name, 'unset_%s' % name, 0) + cookie = this.cookie + cookie['id'] = toolkit.uuid() + self._pulls[cookie['id']] = cookie + + pull_r = cookie.get('pull') + if not pull_r: + return [] + + reply = [] + exclude = [] + acks = cookie.get('ack') + if acks: + acked = {} + for req in cookie.get('request') or []: + ack_r = None + for push_r, ack_r in acks.get(req['origin']) or []: + if req['ranges'] == push_r: + break + else: + continue + ranges.include(acked.setdefault(req['from'], []), ack_r) + reply.append(('ack', {'to': req['from'], 'ack': ack_r}, [])) + for node, ack_ranges in acks.items(): + acked_r = acked.setdefault(node, []) + for __, i in ack_ranges: + ranges.include(acked_r, i) + r = reduce(lambda x, y: ranges.intersect(x, y), acked.values()) + ranges.include(exclude, r) + + push = this.volume.diff(pull_r, exclude, one_way=True, files=['']) + reply.append(('push', None, push)) + + return reply diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 9ead01c..90e50c2 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -16,7 +16,7 @@ import bisect import logging -from sugar_network import db, toolkit +from sugar_network import db from sugar_network.model import Release, context as base_context from sugar_network.node import obs from sugar_network.toolkit.router import ACL @@ -105,71 +105,6 @@ class Context(base_context.Context): return value -def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, - ignore_documents=None, **kwargs): - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_the_only_seq = not out_seq - if layer: - if isinstance(layer, basestring): - layer = [layer] - layer.append('common') - try: - for resource, directory in volume.items(): - if ignore_documents and resource in ignore_documents: - continue - coroutine.dispatch() - directory.commit() - yield {'resource': resource} - for guid, patch in directory.diff(in_seq, exclude_seq, - layer=layer if resource == 'context' else None): - adiff = {} - adiff_seq = toolkit.Sequence() - for prop, meta, seqno in patch: - adiff[prop] = meta - adiff_seq.include(seqno, seqno) - if adiff: - yield {'guid': guid, 'diff': adiff} - out_seq.include(adiff_seq) - if is_the_only_seq: - # There is only one diff, so, we can stretch it to remove all holes - out_seq.stretch() - except StopIteration: - pass - - yield {'commit': out_seq} - - -def merge(volume, records): - directory = None - commit_seq = toolkit.Sequence() - merged_seq = toolkit.Sequence() - synced = False - - for record in records: - resource_ = record.get('resource') - if resource_: - directory = volume[resource_] - continue - - if 'guid' in record: - seqno, merged = directory.merge(**record) - synced = synced or merged - if seqno is not None: - merged_seq.include(seqno, seqno) - continue - - commit = record.get('commit') - if commit is not None: - commit_seq.include(commit) - continue - - if synced: - this.broadcast({'event': 'sync'}) - - return commit_seq, merged_seq - - def solve(volume, top_context, lsb_id=None, lsb_release=None, stability=None, requires=None): top_context = volume['context'][top_context] diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 6d5d200..86e4ce1 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -18,14 +18,13 @@ import time import logging import hashlib from ConfigParser import ConfigParser -from os.path import join, isdir, exists +from os.path import join, exists -from sugar_network import db, node, toolkit -from sugar_network.db import blobs +from sugar_network import db, node from sugar_network.model import FrontRoutes, load_bundle -from sugar_network.node import stats_user, model +from sugar_network.node import model # pylint: disable-msg=W0611 -from sugar_network.toolkit.router import route, preroute, postroute, ACL +from sugar_network.toolkit.router import route, preroute, postroute, ACL, File from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute from sugar_network.toolkit.spec import parse_requires, parse_version from sugar_network.toolkit.bundle import Bundle @@ -53,10 +52,6 @@ class NodeRoutes(db.Routes, FrontRoutes): def guid(self): return self._guid - @route('GET', cmd='logon', acl=ACL.AUTH) - def logon(self): - pass - @route('GET', cmd='whoami', mime_type='application/json') def whoami(self, request, response): roles = [] @@ -66,7 +61,7 @@ class NodeRoutes(db.Routes, FrontRoutes): @route('GET', cmd='status', mime_type='application/json') def status(self): - return {'guid': self._guid, + return {'guid': self.guid, 'seqno': { 'db': self.volume.seqno.value, 'releases': self.volume.releases_seqno.value, @@ -80,49 +75,39 @@ class NodeRoutes(db.Routes, FrontRoutes): @fallbackroute('GET', ['packages']) def route_packages(self, request, response): - enforce(node.files_root.value, http.BadRequest, 'Disabled') - - if request.path and request.path[-1] == 'updates': - root = join(node.files_root.value, *request.path[:-1]) - enforce(isdir(root), http.NotFound, 'Directory was not found') + path = this.request.path + if path and path[-1] == 'updates': result = [] last_modified = 0 - for filename in os.listdir(root): - if '.' in filename: + for blob in this.volume.blobs.diff( + [[this.request.if_modified_since + 1, None]], + join(*path[:-1]), recursive=False): + if '.' in blob.name: continue - path = join(root, filename) - mtime = int(os.stat(path).st_mtime) - if mtime > request.if_modified_since: - result.append(filename) - last_modified = max(last_modified, mtime) + result.append(blob.name) + last_modified = max(last_modified, blob.mtime) response.content_type = 'application/json' if last_modified: response.last_modified = last_modified return result - path = join(node.files_root.value, *request.path) - enforce(exists(path), http.NotFound, 'File was not found') - if not isdir(path): - return toolkit.iter_file(path) - - result = [] - for filename in os.listdir(path): - if filename.endswith('.rpm') or filename.endswith('.deb'): - continue - result.append(filename) - - response.content_type = 'application/json' - return result + blob = this.volume.blobs.get(join(*path)) + if isinstance(blob, File): + return blob + else: + response.content_type = 'application/json' + return [i.name for i in blob if '.' not in i.name] @route('POST', ['context'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) - def submit_release(self, request, initial): - blob = blobs.post(request.content_stream, request.content_type) + def submit_release(self, initial): + blob = this.volume.blobs.post( + this.request.content_stream, this.request.content_type) try: context, release = load_bundle(blob, initial=initial) except Exception: - blobs.delete(blob.digest) + this.volume.blobs.delete(blob.digest) raise this.call(method='POST', path=['context', context, 'releases'], content_type='application/json', content=release) @@ -156,29 +141,7 @@ class NodeRoutes(db.Routes, FrontRoutes): arguments={'requires': list}) def get_clone(self, request, response): solution = self.solve(request) - return blobs.get(solution[request.guid]['blob']) - - @route('GET', ['user', None], cmd='stats-info', - mime_type='application/json', acl=ACL.AUTH) - def user_stats_info(self, request): - status = {} - for rdb in stats_user.get_rrd(request.guid): - status[rdb.name] = rdb.last + stats_user.stats_user_step.value - - # TODO Process client configuration in more general manner - return {'enable': True, - 'step': stats_user.stats_user_step.value, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'status': status, - } - - @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH) - def user_stats_upload(self, request): - name = request.content['name'] - values = request.content['values'] - rrd = stats_user.get_rrd(request.guid) - for timestamp, values in values: - rrd[name].put(values, timestamp) + return this.volume.blobs.get(solution[request.guid]['blob']) @preroute def preroute(self, op, request, response): @@ -204,7 +167,7 @@ class NodeRoutes(db.Routes, FrontRoutes): def on_create(self, request, props): if request.resource == 'user': - with file(blobs.get(props['pubkey']).path) as f: + with file(this.volume.blobs.get(props['pubkey']).path) as f: props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) db.Routes.on_create(self, request, props) @@ -223,7 +186,7 @@ class NodeRoutes(db.Routes, FrontRoutes): from M2Crypto import RSA pubkey = self.volume['user'][auth.login]['pubkey'] - key = RSA.load_pub_key(blobs.get(pubkey).path) + key = RSA.load_pub_key(this.volume.blobs.get(pubkey).path) data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() enforce(key.verify(data, auth.signature.decode('hex')), http.Forbidden, 'Bad credentials') diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 2d60ea8..333e6ea 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,13 +21,12 @@ from urlparse import urlsplit from os.path import join, dirname, exists, isabs from gettext import gettext as _ -from sugar_network import node, toolkit -from sugar_network.client import api_url -from sugar_network.node import sync, stats_user, files, model +from sugar_network import toolkit +from sugar_network.node import master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit import http, parcel, ranges, enforce _logger = logging.getLogger('node.slave') @@ -35,148 +34,94 @@ _logger = logging.getLogger('node.slave') class SlaveRoutes(NodeRoutes): - def __init__(self, key_path, volume_): - self._auth = http.SugarAuth(key_path) - NodeRoutes.__init__(self, self._auth.login, volume_) - - self._push_seq = toolkit.PersistentSequence( - join(volume_.root, 'push.sequence'), [1, None]) - self._pull_seq = toolkit.PersistentSequence( - join(volume_.root, 'pull.sequence'), [1, None]) - self._files_seq = toolkit.PersistentSequence( - join(volume_.root, 'files.sequence'), [1, None]) - self._master_guid = urlsplit(api_url.value).netloc - self._offline_session = None - - @route('POST', cmd='online-sync', acl=ACL.LOCAL) + def __init__(self, volume, **kwargs): + self._creds = http.SugarAuth( + join(volume.root, 'etc', 'private', 'node')) + NodeRoutes.__init__(self, self._creds.login, volume=volume, **kwargs) + vardir = join(volume.root, 'var') + self._push_r = toolkit.Bin(join(vardir, 'push.ranges'), [[1, None]]) + self._pull_r = toolkit.Bin(join(vardir, 'pull.ranges'), [[1, None]]) + self._master_guid = urlsplit(master_api.value).netloc + + @route('POST', cmd='online_sync', acl=ACL.LOCAL, + arguments={'no_pull': bool}) def online_sync(self, no_pull=False): - conn = http.Connection(api_url.value, auth=self._auth) - - # TODO `http.Connection` should handle re-POSTing without - # loosing payload after authentication - conn.get(cmd='logon') - - push = [('diff', None, model.diff(self.volume, self._push_seq))] - if not no_pull: - push.extend([ - ('pull', { - 'sequence': self._pull_seq, - 'layer': node.sync_layers.value, - }, None), - ('files_pull', {'sequence': self._files_seq}, None), - ]) - if stats_user.stats_user.value: - push.append(('stats_diff', None, stats_user.diff())) + self._export(not no_pull) + conn = http.Connection(master_api.value) response = conn.request('POST', - data=sync.encode(push, src=self.guid, dst=self._master_guid), + data=parcel.encode(self._export(not no_pull), header={ + 'from': self.guid, + 'to': self._master_guid, + }), params={'cmd': 'sync'}, headers={'Transfer-Encoding': 'chunked'}) - self._import(sync.decode(response.raw), None) + self._import(parcel.decode(response.raw)) - @route('POST', cmd='offline-sync', acl=ACL.LOCAL) + @route('POST', cmd='offline_sync', acl=ACL.LOCAL) def offline_sync(self, path): - enforce(node.sync_layers.value, - '--sync-layers is not specified, the full master dump ' - 'might be too big and should be limited') - enforce(isabs(path), 'Argument \'path\' should be an absolute path') - - _logger.debug('Start %r synchronization session in %r', - self._offline_session, path) + enforce(isabs(path), "Argument 'path' is not an absolute path") + _logger.debug('Start offline synchronization in %r', path) if not exists(path): os.makedirs(path) - try: - self._offline_session = self._offline_sync(path, - **(self._offline_session or {})) - except Exception: - toolkit.exception(_logger, 'Failed to complete synchronization') - self._offline_session = None - raise - - if self._offline_session is None: - _logger.debug('Synchronization completed') - else: - _logger.debug('Postpone synchronization with %r session', - self._offline_session) - - def status(self): - result = NodeRoutes.status(self) - result['level'] = 'slave' - return result - - def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None): - push = [] - - if push_seq is None: - push_seq = toolkit.Sequence(self._push_seq) - if stats_seq is None: - stats_seq = {} - if session is None: - session = toolkit.uuid() - push.append(('pull', { - 'sequence': self._pull_seq, - 'layer': node.sync_layers.value, - }, None)) - push.append(('files_pull', {'sequence': self._files_seq}, None)) - this.broadcast({ 'event': 'sync_progress', 'progress': _('Reading sneakernet packages'), }) - self._import(sync.sneakernet_decode(path), push_seq) - - offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') - if exists(offline_script): - shutil.copy(offline_script, path) + requests = self._import(parcel.decode_dir(path)) this.broadcast({ 'event': 'sync_progress', 'progress': _('Generating new sneakernet package'), }) + offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') + if exists(offline_script): + shutil.copy(offline_script, path) + parcel.encode_dir(requests + self._export(True), root=path, header={ + 'from': self.guid, + 'to': self._master_guid, + }) + + _logger.debug('Synchronization completed') + + def status(self): + result = NodeRoutes.status(self) + result['mode'] = 'slave' + return result + + def _import(self, package): + requests = [] - diff_seq = toolkit.Sequence([]) - push.append(('diff', None, - model.diff(self.volume, push_seq, diff_seq))) - if stats_user.stats_user.value: - push.append(('stats_diff', None, stats_user.diff(stats_seq))) - complete = sync.sneakernet_encode(push, root=path, - src=self.guid, dst=self._master_guid, api_url=api_url.value, - session=session) - if not complete: - push_seq.exclude(diff_seq) - return {'push_seq': push_seq, - 'stats_seq': stats_seq, - 'session': session, - } - - def _import(self, package, push_seq): for packet in package: - from_master = (packet['src'] == self._master_guid) - addressed = (packet['dst'] == self.guid) - - if packet.name == 'diff': - _logger.debug('Processing %r', packet) - seq, __ = model.merge(self.volume, packet, shift_seqno=False) - if from_master and seq: - self._pull_seq.exclude(seq) - self._pull_seq.commit() - - elif from_master: - if packet.name == 'ack' and addressed: - _logger.debug('Processing %r', packet) - if push_seq: - push_seq.exclude(packet['sequence']) - self._pull_seq.exclude(packet['ack']) - self._pull_seq.commit() - self._push_seq.exclude(packet['sequence']) - self._push_seq.commit() - elif packet.name == 'stats_ack' and addressed: - _logger.debug('Processing %r', packet) - stats_user.commit(packet['sequence']) - elif packet.name == 'files_diff': - _logger.debug('Processing %r', packet) - seq = files.merge(node.files_root.value, packet) - if seq: - self._files_seq.exclude(seq) - self._files_seq.commit() + sender = packet['from'] + from_master = (sender == self._master_guid) + if packet.name == 'push': + seqno, committed = this.volume.patch(packet) + if seqno is not None: + if from_master: + ranges.exclude(self._pull_r.value, committed) + self._pull_r.commit() + else: + requests.append(('request', { + 'origin': sender, + 'ranges': committed, + }, [])) + ranges.exclude(self._push_r.value, seqno, seqno) + self._push_r.commit() + elif packet.name == 'ack' and from_master and \ + packet['to'] == self.guid: + ranges.exclude(self._pull_r.value, packet['ack']) + self._pull_r.commit() + if packet['ranges']: + ranges.exclude(self._push_r.value, packet['ranges']) + self._push_r.commit() + + return requests + + def _export(self, pull): + export = [] + if pull: + export.append(('pull', {'ranges': self._pull_r.value}, None)) + export.append(('push', None, self.volume.diff(self._push_r.value))) + return export diff --git a/sugar_network/node/stats_user.py b/sugar_network/node/stats_user.py deleted file mode 100644 index 0b96449..0000000 --- a/sugar_network/node/stats_user.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import logging -from os.path import join, exists, isdir - -from sugar_network import node, toolkit -from sugar_network.toolkit.rrd import Rrd -from sugar_network.toolkit import Option, pylru, enforce - - -stats_user = Option( - 'accept personalized users statistics', - default=False, type_cast=Option.bool_cast, action='store_true') - -stats_user_step = Option( - 'step interval in seconds for users\' RRD databases', - default=60, type_cast=int) - -stats_user_rras = Option( - 'comma separated list of RRAs for users\' RRD databases', - default=[ - 'RRA:AVERAGE:0.5:1:4320', # one day with 60s step - 'RRA:AVERAGE:0.5:5:2016', # one week with 5min step - ], - type_cast=Option.list_cast, type_repr=Option.list_repr) - -_logger = logging.getLogger('node.stats_user') -_user_cache = pylru.lrucache(32) - - -def get_rrd(user): - if user in _user_cache: - return _user_cache[user] - else: - rrd = _user_cache[user] = Rrd(_rrd_path(user), - stats_user_step.value, stats_user_rras.value) - return rrd - - -def diff(in_info=None): - if in_info is None: - in_info = {} - out_info = {} - - try: - for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')): - in_info.setdefault(user, {}) - out_info.setdefault(user, {}) - - for db in rrd: - yield {'db': db.name, 'user': user} - - in_seq = in_info[user].get(db.name) - if in_seq is None: - in_seq = toolkit.PersistentSequence( - join(rrd.root, db.name + '.push'), [1, None]) - in_info[user][db.name] = in_seq - elif in_seq is not toolkit.Sequence: - in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq) - out_seq = out_info[user].setdefault(db.name, - toolkit.Sequence()) - - for start, end in in_seq: - for timestamp, values in \ - db.get(max(start, db.first), end or db.last): - yield {'timestamp': timestamp, 'values': values} - in_seq.exclude(start, timestamp) - out_seq.include(start, timestamp) - start = timestamp - except StopIteration: - pass - - yield {'commit': out_info} - - -def merge(packet): - db = None - seq = None - - for record in packet: - if 'db' in record: - db = get_rrd(record['user'])[record['db']] - elif 'commit' in record: - seq = record['commit'] - else: - enforce(db is not None, 'Malformed user stats diff') - db.put(record['values'], record['timestamp']) - - return seq - - -def commit(info): - for user, dbs in info.items(): - for db_name, merged in dbs.items(): - seq = toolkit.PersistentSequence( - _rrd_path(user, db_name + '.push'), [1, None]) - seq.exclude(merged) - seq.commit() - - -def _walk_rrd(root): - if not exists(root): - return - for users_dirname in os.listdir(root): - users_dir = join(root, users_dirname) - if not isdir(users_dir): - continue - for user in os.listdir(users_dir): - yield user, Rrd(join(users_dir, user), stats_user_step.value) - - -def _rrd_path(user, *args): - return join(node.stats_root.value, 'user', user[:2], user, *args) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index d3d9b88..89a9d0f 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -523,7 +523,7 @@ class Bin(object): json.dump(self.value, f) f.flush() os.fsync(f.fileno()) - self._orig_value = self.value + self._orig_value = deepcopy(self.value) return True def __enter__(self): diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index d280035..254e6f3 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -112,8 +112,8 @@ class Connection(object): _Session = None - def __init__(self, api_url='', auth=None, max_retries=0, **session_args): - self.api_url = api_url + def __init__(self, api='', auth=None, max_retries=0, **session_args): + self.api = api self.auth = auth self._max_retries = max_retries self._session_args = session_args @@ -121,7 +121,7 @@ class Connection(object): self._nonce = None def __repr__(self): - return '<Connection api_url=%s>' % self.api_url + return '<Connection api=%s>' % self.api def __enter__(self): return self @@ -203,7 +203,7 @@ class Connection(object): if not path: path = [''] if not isinstance(path, basestring): - path = '/'.join([i.strip('/') for i in [self.api_url] + path]) + path = '/'.join([i.strip('/') for i in [self.api] + path]) try_ = 0 while True: @@ -283,7 +283,7 @@ class Connection(object): break path = reply.headers['location'] if path.startswith('/'): - path = self.api_url + path + path = self.api + path if request.method != 'HEAD': if reply.headers.get('Content-Type') == 'application/json': @@ -380,7 +380,7 @@ class SugarAuth(object): key_dir = dirname(self._key_path) if exists(self._key_path): - if os.stat(key_dir) & 077: + if os.stat(key_dir).st_mode & 077: os.chmod(key_dir, 0700) self._key = RSA.load_key(self._key_path) return @@ -432,7 +432,7 @@ class _Subscription(object): if try_ == 0: raise toolkit.exception('Failed to read from %r subscription, ' - 'will resubscribe', self._client.api_url) + 'will resubscribe', self._client.api) self._content = None return _parse_event(line) @@ -441,7 +441,7 @@ class _Subscription(object): return self._content params.update(self._condition) params['cmd'] = 'subscribe' - _logger.debug('Subscribe to %r, %r', self._client.api_url, params) + _logger.debug('Subscribe to %r, %r', self._client.api, params) response = self._client.request('GET', params=params) self._content = response.raw return self._content diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py deleted file mode 100644 index bafe6d1..0000000 --- a/sugar_network/toolkit/rrd.py +++ /dev/null @@ -1,296 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -"""Convenient access to RRD databases.""" - -import re -import os -import time -import json -import bisect -import logging -from os.path import exists, join, splitext - - -_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$') -_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$') - -_FETCH_PAGE = 256 - -_logger = logging.getLogger('rrd') -_rrdtool = None - - -class Rrd(object): - - def __init__(self, root, step, rras=None): - global _rrdtool - - import rrdtool - _rrdtool = rrdtool - - self._root = root - self._step = step - # rrdtool knows nothing about `unicode` - self._rras = [i.encode('utf8') for i in rras or []] - self._dbsets = {} - - if not exists(self._root): - os.makedirs(self._root) - - for filename in os.listdir(self._root): - match = _DB_FILENAME_RE.match(filename) - if match is not None: - name, revision = match.groups() - self.get(name).load(filename, int(revision or 0)) - - def __iter__(self): - for i in self._dbsets.values(): - yield i - - def __getitem__(self, name): - return self.get(name) - - def __contains__(self, name): - return name in self._dbsets - - @property - def root(self): - return self._root - - @property - def step(self): - return self._step - - def get(self, name): - db = self._dbsets.get(name) - if db is None: - db = _DbSet(self._root, name, self._step, self._rras) - self._dbsets[name] = db - return db - - -class _DbSet(object): - - def __init__(self, root, name, step, rras): - self._root = root - self.name = name - self._step = step - self._rras = rras - self._revisions = [] - self._fields = None - self._field_names = None - self.__db = None - - @property - def fields(self): - return self._field_names - - @fields.setter - def fields(self, fields): - self._field_names = fields.keys() - self._field_names.sort() - self._fields = [str(fields[i]) for i in self._field_names] - _logger.debug('Set %r fields for %r', self._fields, self.name) - - @property - def first(self): - if not self._revisions: - return - return self._revisions[0].first - - @property - def last(self): - if not self._revisions: - return - return self._revisions[-1].last - - @property - def last_ds(self): - if not self._revisions or not self._field_names: - return {} - info = _rrdtool.info(self._revisions[-1].path) - result = {} - for field in self._field_names: - result[field] = float(info.get('ds[%s].last_ds' % field) or 0) - return result - - def load(self, filename, revision): - _logger.debug('Load %s database from %s with revision %s', - filename, self._root, revision) - db = _Db(join(self._root, filename), revision) - bisect.insort(self._revisions, db) - return db - - def put(self, values, timestamp=None): - if not self.fields: - _logger.debug('Parse fields from the first put') - self.fields = dict([ - (i, 'DS:%s:GAUGE:%s:U:U' % (i, self._step * 2)) - for i in values]) - - if not timestamp: - timestamp = int(time.time()) - timestamp = timestamp / self._step * self._step - - db = self._get_db(timestamp) - if db is None: - return - - if timestamp <= db.last: - _logger.warning('Database %s updated at %s, %s in the past', - db.path, db.last, timestamp) - return - - value = [str(timestamp)] - for name in self._field_names: - value.append(str(values[name])) - - _logger.debug('Put %r to %s', value, db.path) - - db.put(':'.join(value), timestamp) - - def get(self, start=None, end=None, resolution=None): - if not self._revisions: - return - - if not resolution: - resolution = self._step - - if start is None: - start = self._revisions[0].first - if end is None: - end = self._revisions[-1].last - - revisions = [] - for db in reversed(self._revisions): - revisions.append(db) - if db.last <= start: - break - - start = start - start % self._step - self._step - last = min(end, start + _FETCH_PAGE * resolution) - last -= last % self._step + self._step - - for db in reversed(revisions): - db_end = min(last, db.last - self._step) - if start > db_end: - break - (row_start, start, row_step), __, rows = _rrdtool.fetch( - str(db.path), - 'AVERAGE', - '--start', str(start), - '--end', str(db_end), - '--resolution', str(resolution)) - for raw_row in rows: - row_start += row_step - if row_start > end: - break - row = {} - for i, value in enumerate(raw_row): - row[db.field_names[i]] = value or .0 - yield row_start, row - start = db_end + 1 - - def _get_db(self, timestamp): - if self.__db is None and self._fields: - if self._revisions: - db = self._revisions[-1] - if db.last >= timestamp: - _logger.warning( - 'Database %s updated at %s, %s in the past', - db.path, db.last, timestamp) - return None - if db.step != self._step or db.rras != self._rras or \ - db.field_names != self._field_names: - db = self._create_db(db.revision + 1, db.last) - else: - db = self._create_db(0, timestamp) - self.__db = db - return self.__db - - def _create_db(self, revision, timestamp): - filename = self.name - if revision: - filename += '-%s' % revision - filename += '.rrd' - - _logger.debug('Create %s database in %s start=%s step=%s', - filename, self._root, timestamp, self._step) - - _rrdtool.create( - str(join(self._root, filename)), - '--start', str(timestamp - self._step), - '--step', str(self._step), - *(self._fields + self._rras)) - - return self.load(filename, revision) - - -class _Db(object): - - def __init__(self, path, revision=0): - self.path = str(path) - self._meta_path = splitext(path)[0] + '.meta' - self.revision = revision - self.fields = [] - self.field_names = [] - self.rras = [] - - info = _rrdtool.info(self.path) - self.step = info['step'] - self.first = 0 - self.last = info['last_update'] - - fields = {} - rras = {} - - for key, value in info.items(): - match = _INFO_RE.match(key) - if match is None: - continue - prefix, key, prop = match.groups() - if prefix == 'ds': - fields.setdefault(key, {}) - fields[key][prop] = value - if prefix == 'rra': - rras.setdefault(key, {}) - rras[key][prop] = value - - for index in sorted([int(i) for i in rras.keys()]): - rra = rras[str(index)] - self.rras.append( - 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra) - - for name in sorted(fields.keys()): - props = fields[name] - props['name'] = name - self.fields.append(props) - self.field_names.append(name) - - if exists(self._meta_path): - with file(self._meta_path) as f: - self.first = json.load(f).get('first') - - def put(self, value, timestamp): - if not self.first: - with file(self._meta_path, 'w') as f: - json.dump({'first': timestamp}, f) - self.first = timestamp - _rrdtool.update(self.path, str(value)) - self.last = _rrdtool.info(self.path)['last_update'] - - def __cmp__(self, other): - return cmp(self.revision, other.revision) diff --git a/tests/__init__.py b/tests/__init__.py index cc9ec01..dc10cdb 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -27,7 +27,7 @@ from sugar_network.model.user import User from sugar_network.model.context import Context from sugar_network.model.post import Post from sugar_network.node.master import MasterRoutes -from sugar_network.node import stats_user, obs, slave +from sugar_network.node import obs, slave from requests import adapters @@ -87,11 +87,11 @@ class Test(unittest.TestCase): db.index_flush_threshold.value = 1 node.find_limit.value = 1024 node.data_root.value = tmpdir - node.stats_root.value = tmpdir + '/stats' node.port.value = 8888 + node.master_api.value = 'http://127.0.0.1:7777' db.index_write_queue.value = 10 client.local_root.value = tmpdir - client.api_url.value = 'http://127.0.0.1:8888' + client.api.value = 'http://127.0.0.1:7777' client.mounts_root.value = None client.ipc_port.value = 5555 client.layers.value = None @@ -103,9 +103,6 @@ class Test(unittest.TestCase): mountpoints._connects.clear() mountpoints._found.clear() mountpoints._COMPLETE_MOUNT_TIMEOUT = .1 - stats_user.stats_user.value = False - stats_user.stats_user_step.value = 1 - stats_user._user_cache.clear() obs._conn = None obs._repos = {'base': [], 'presolve': []} http._RECONNECTION_NUMBER = 0 @@ -265,10 +262,11 @@ class Test(unittest.TestCase): def start_master(self, classes=None, routes=MasterRoutes): if classes is None: classes = [User, Context, Post] + #self.touch(('master/etc/private/node', file(join(root, 'data', NODE_UID)).read())) self.node_volume = db.Volume('master', classes) - self.node_routes = routes('master', volume=self.node_volume) + self.node_routes = routes(volume=self.node_volume) self.node_router = Router(self.node_routes) - self.node = coroutine.WSGIServer(('127.0.0.1', 8888), self.node_router) + self.node = coroutine.WSGIServer(('127.0.0.1', 7777), self.node_router) coroutine.spawn(self.node.serve_forever) coroutine.dispatch(.1) this.volume = self.node_volume @@ -281,8 +279,7 @@ class Test(unittest.TestCase): def node(): volume = db.Volume('master', classes) - self.node_routes = routes('guid', volume) - node = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.node_routes)) + node = coroutine.WSGIServer(('127.0.0.1', 7777), Router(routes(volume=volume))) node.serve_forever() pid = self.fork(node) @@ -293,7 +290,7 @@ class Test(unittest.TestCase): if classes is None: classes = [User, Context] volume = db.Volume('client', classes) - self.client_routes = routes(volume, client.api_url.value) + self.client_routes = routes(volume, client.api.value) self.client = coroutine.WSGIServer( ('127.0.0.1', client.ipc_port.value), Router(self.client_routes)) coroutine.spawn(self.client.serve_forever) @@ -306,7 +303,7 @@ class Test(unittest.TestCase): classes = [User, Context] self.start_master(classes) volume = db.Volume('client', classes) - self.client_routes = ClientRoutes(volume, client.api_url.value) + self.client_routes = ClientRoutes(volume, client.api.value) self.wait_for_events(self.client_routes, event='inline', state='online').wait() self.client = coroutine.WSGIServer( ('127.0.0.1', client.ipc_port.value), Router(self.client_routes)) @@ -324,33 +321,6 @@ class Test(unittest.TestCase): this.volume = self.home_volume return IPCConnection() - def restful_server(self, classes=None): - if not exists('remote'): - os.makedirs('remote') - - logfile = file('remote/log', 'a') - sys.stdout = sys.stderr = logfile - - for handler in logging.getLogger().handlers: - logging.getLogger().removeHandler(handler) - logging.basicConfig(level=logging.DEBUG) - - db.index_flush_timeout.value = 0 - db.index_flush_threshold.value = 1 - node.find_limit.value = 1024 - db.index_write_queue.value = 10 - - volume = db.Volume('remote', classes or [User, Context]) - self.node_routes = MasterRoutes('guid', volume) - httpd = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.node_routes)) - try: - coroutine.joinall([ - coroutine.spawn(httpd.serve_forever), - ]) - finally: - httpd.stop() - volume.close() - def wait_for_events(self, cp, **condition): trigger = coroutine.AsyncResult() @@ -412,3 +382,5 @@ oLqnwHwnk4DFkdO7ZwIDAQAB -----END PUBLIC KEY----- """ UID2 = 'd820a3405d6aadf2cf207f6817db2a79f8fa07aa' + +NODE_UID = 'c41529f1d629e60bdc21434011133f2c8f65f643' diff --git a/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643 b/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643 new file mode 100644 index 0000000..f76506d --- /dev/null +++ b/tests/data/c41529f1d629e60bdc21434011133f2c8f65f643 @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQCv0awK/tGfUHaxQfQ5dB/he8ODgGClqZz1toFNLkUm5qVmWA97 +yszv+EVpDLLm+XtMwNhMBUIH3JcCrNiXOJJBW+Aj5pQ3Rby2RBZTJxasiCujNGq3 +8QKOmCLCp0tqQ2VyXtlNn+azg7Lz05RdzrprnDrwu/w7Q3UmY0s92G6sfQIDAQAB +AoGBAKYxfvvhxUpf5+JEYtQQRbaBo91g83qE6t6ExpKrQyizavNkGDa/C5tmRk43 +d8DHYNq7i3nImpMN3BzmP9Ip4mydORw1hGn79P20IeTozTeeBDifDMqxlopeVxeg +7VbYPyYJAU5Gz5dC3JXdf0qeBfPB9Rb5E3E8S+MPl3pFncTtAkEA5k3o1ja2lhCv +pe2/TrL8YEEhYY2KsB3DiL29CMuQwGePAd1GG5uSlZWhGC5zAbekmQKHa8oMHC7h +p0ZjtqYZKwJBAMNvhUiwDd+TQejQAtXbD1dk62TPvTnQW8dD558qNLlz/ernNpBj +v9Y3oq+bjkDzvpheY4K5xrg9W4TJancLLPcCQHjeLLO4FU1exoCD7SJFh3SQ2g8T +tNTHWia6xaoHBBomf4RP+ApnNKAy3lANmKgvFECFdkMY0BA+folGxPBH7e8CQQCF +FKSq+Y+I5gqkkTjNDW1l8ofETx2oh7RnfVr07FWYz15hne5u5i3Unm/+qqt0mUX5 +FZUniH/EJ6vxQQJpa8fDAkB6bV2mTrvx/Atk3ADIVWfI5PozZt9NoJKrMuWhoCvt +kp5Rs9gtr/hqo/mPLCTy3r+v+suAji/VS/OKApeRhmCz +-----END RSA PRIVATE KEY----- diff --git a/tests/integration/master_personal.py b/tests/integration/master_personal.py index ec87942..7bfa6aa 100755 --- a/tests/integration/master_personal.py +++ b/tests/integration/master_personal.py @@ -38,7 +38,7 @@ class MasterPersonalTest(tests.Test): '--obs-url=', ]) self.client_pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start', - '--api-url=http://127.0.0.1:8100', '--cachedir=client/tmp', + '--api=http://127.0.0.1:8100', '--cachedir=client/tmp', '-DDD', '--rundir=client/run', '--server-mode', '--layers=pilot', '--local-root=client', '--port=8101', '--index-flush-threshold=1', diff --git a/tests/integration/master_slave.py b/tests/integration/master_slave.py index 0bf9f4f..c9981df 100755 --- a/tests/integration/master_slave.py +++ b/tests/integration/master_slave.py @@ -38,7 +38,7 @@ class MasterSlaveTest(tests.Test): '--obs-url=', ]) self.slave_pid = self.popen([join(src_root, 'sugar-network-node'), '-F', 'start', - '--api-url=http://127.0.0.1:8100', + '--api=http://127.0.0.1:8100', '--port=8101', '--data-root=slave/db', '--cachedir=slave/tmp', '-DDD', '--rundir=slave/run', '--files-root=slave/files', '--stats-root=slave/stats', '--stats-user', '--stats-user-step=1', diff --git a/tests/integration/node_client.py b/tests/integration/node_client.py index 64fe97c..5dac3dc 100755 --- a/tests/integration/node_client.py +++ b/tests/integration/node_client.py @@ -134,14 +134,14 @@ class NodeClientTest(tests.Test): self.assertEqual(['clone'], json.load(file('client/db/context/co/context/layer'))['value']) def cli(self, cmd, stdin=None): - cmd = ['sugar-network', '--local-root=client', '--ipc-port=5101', '--api-url=http://127.0.0.1:8100', '-DDD'] + cmd + cmd = ['sugar-network', '--local-root=client', '--ipc-port=5101', '--api=http://127.0.0.1:8100', '-DDD'] + cmd if '--anonymous' not in cmd and not self.client_pid: self.client_pid = self.popen([join(src_root, 'sugar-network-client'), '-DDDF', 'start', '--local-root=client', '--mounts-root=mnt', '--cachedir=tmp', '--ipc-port=5101', - '--api-url=http://127.0.0.1:8100', + '--api=http://127.0.0.1:8100', ]) coroutine.sleep(2) ipc = Connection('http://127.0.0.1:5101') diff --git a/tests/integration/node_packages.py b/tests/integration/node_packages.py index 91b6f16..176fbc1 100755 --- a/tests/integration/node_packages.py +++ b/tests/integration/node_packages.py @@ -98,7 +98,7 @@ class NodePackagesSlaveTest(tests.Test): urllib2.urlopen('http://127.0.0.1:8100/packages/presolve/OLPC-11.3.1/rpm').read()) pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start', - '--api-url=http://127.0.0.1:8100', '--cachedir=master.client/tmp', + '--api=http://127.0.0.1:8100', '--cachedir=master.client/tmp', '-DDD', '--rundir=master.client/run', '--layers=pilot', '--local-root=master.client', '--index-flush-threshold=1', '--ipc-port=8200', @@ -116,7 +116,7 @@ class NodePackagesSlaveTest(tests.Test): # From slave self.pids.append(self.popen([join(src_root, 'sugar-network-node'), '-F', 'start', - '--api-url=http://127.0.0.1:8100', + '--api=http://127.0.0.1:8100', '--port=8101', '--data-root=slave/db', '--cachedir=slave/tmp', '-DDD', '--rundir=slave/run', '--files-root=slave/files', '--stats-root=slave/stats', '--stats-user', '--stats-user-step=1', @@ -136,7 +136,7 @@ class NodePackagesSlaveTest(tests.Test): urllib2.urlopen('http://127.0.0.1:8101/packages/presolve/OLPC-11.3.1/rpm').read()) pid = self.popen([join(src_root, 'sugar-network-client'), '-F', 'start', - '--api-url=http://127.0.0.1:8101', '--cachedir=master.client/tmp', + '--api=http://127.0.0.1:8101', '--cachedir=master.client/tmp', '-DDD', '--rundir=master.client/run', '--layers=pilot', '--local-root=master.client', '--index-flush-threshold=1', '--ipc-port=8200', @@ -155,7 +155,7 @@ class NodePackagesSlaveTest(tests.Test): os.makedirs('client/mnt/disk/sugar-network') self.pids.append(self.popen([join(src_root, 'sugar-network-client'), '-F', 'start', - '--api-url=http://127.0.0.1:8100', '--cachedir=client/tmp', + '--api=http://127.0.0.1:8100', '--cachedir=client/tmp', '-DDD', '--rundir=client/run', '--server-mode', '--layers=pilot', '--local-root=client', '--port=8102', '--index-flush-threshold=1', diff --git a/tests/regression/api.py b/tests/regression/api.py index ce89952..05bf21c 100755 --- a/tests/regression/api.py +++ b/tests/regression/api.py @@ -101,7 +101,7 @@ class Api(tests.Test): self.client_pid = self.popen([join(PROD_ROOT, 'sugar-network-client'), '-DDDF', 'start', '--local-root=client', '--mounts-root=mnt', '--cachedir=tmp', - '--ipc-port=%s' % client.ipc_port.value, '--api-url=%s' % client.api_url.value, + '--ipc-port=%s' % client.ipc_port.value, '--api=%s' % client.api.value, ]) def tearDown(self): diff --git a/tests/units/client/offline_routes.py b/tests/units/client/offline_routes.py index b14ccd6..c8ac58c 100755 --- a/tests/units/client/offline_routes.py +++ b/tests/units/client/offline_routes.py @@ -340,7 +340,7 @@ class OfflineRoutes(tests.Test): }] assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) self.node.stop() @@ -355,7 +355,7 @@ class OfflineRoutes(tests.Test): [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) def test_ServiceUnavailableWhileSolving(self): diff --git a/tests/units/client/online_routes.py b/tests/units/client/online_routes.py index cf14834..50df2ec 100755 --- a/tests/units/client/online_routes.py +++ b/tests/units/client/online_routes.py @@ -591,7 +591,7 @@ Can't find all required implementations: self.assertEqual('content', file(blob_path).read()) assert exists(clone_path + '/data.blob') self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/%s/%s' % (context[:2], context)))) self.assertEqual([ @@ -628,7 +628,7 @@ Can't find all required implementations: self.assertEqual('content', file(blob_path).read()) assert not lexists(clone_path) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/%s/%s' % (context[:2], context)))) self.assertEqual([ @@ -647,7 +647,7 @@ Can't find all required implementations: sorted(ipc.get(['context'], reply='layer')['result'])) assert exists(clone_path + '/data.blob') self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/%s/%s' % (context[:2], context)))) def test_clone_Activity(self): @@ -751,7 +751,7 @@ Can't find all required implementations: self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read()) assert exists(clone_path + '/data.blob/activity/activity.info') self.assertEqual( - [client.api_url.value, ['stable'], downloaded_solution], + [client.api.value, ['stable'], downloaded_solution], json.load(file('solutions/bu/bundle_id'))) self.assertEqual([ @@ -795,7 +795,7 @@ Can't find all required implementations: self.assertEqual(activity_info, file(blob_path + '/activity/activity.info').read()) assert not exists(clone_path) self.assertEqual( - [client.api_url.value, ['stable'], downloaded_solution], + [client.api.value, ['stable'], downloaded_solution], json.load(file('solutions/bu/bundle_id'))) self.assertEqual([ @@ -814,7 +814,7 @@ Can't find all required implementations: sorted(ipc.get(['context'], reply='layer')['result'])) assert exists(clone_path + '/data.blob/activity/activity.info') self.assertEqual( - [client.api_url.value, ['stable'], downloaded_solution], + [client.api.value, ['stable'], downloaded_solution], json.load(file('solutions/bu/bundle_id'))) def test_clone_ActivityWithStabilityPreferences(self): @@ -987,7 +987,7 @@ Can't find all required implementations: [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], downloaded_solution], + [client.api.value, ['stable'], downloaded_solution], json.load(file('solutions/bu/bundle_id'))) blob = self.zips(['TestActivity/activity/activity.info', [ @@ -1032,7 +1032,7 @@ Can't find all required implementations: [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) self.node.stop() @@ -1047,7 +1047,7 @@ Can't find all required implementations: [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) shutil.rmtree('solutions') @@ -1060,7 +1060,7 @@ Can't find all required implementations: [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) def test_launch_Fails(self): @@ -1142,7 +1142,7 @@ Can't find all required implementations: [i for i in ipc.get(['context', 'bundle_id'], cmd='launch', foo='bar')]) assert local['release'].exists(impl) self.assertEqual( - [client.api_url.value, ['stable'], solution], + [client.api.value, ['stable'], solution], json.load(file('solutions/bu/bundle_id'))) def test_InvalidateSolutions(self): @@ -1356,20 +1356,20 @@ Can't find all required implementations: @db.blob_property() def blob3(self, value): - raise http.Redirect(client.api_url.value + prefix + 'blob4') + raise http.Redirect(client.api.value + prefix + 'blob4') self.start_online_client([User, Document]) ipc = IPCConnection() guid = ipc.post(['document'], {}) prefix = '/document/' + guid + '/' - response = requests.request('GET', client.api_url.value + prefix + 'blob1', allow_redirects=False) + response = requests.request('GET', client.api.value + prefix + 'blob1', allow_redirects=False) self.assertEqual(303, response.status_code) self.assertEqual(prefix + 'blob2', response.headers['Location']) - response = requests.request('GET', client.api_url.value + prefix + 'blob3', allow_redirects=False) + response = requests.request('GET', client.api.value + prefix + 'blob3', allow_redirects=False) self.assertEqual(303, response.status_code) - self.assertEqual(client.api_url.value + prefix + 'blob4', response.headers['Location']) + self.assertEqual(client.api.value + prefix + 'blob4', response.headers['Location']) def test_DoNotSwitchToOfflineOnRedirectFails(self): @@ -1386,7 +1386,7 @@ Can't find all required implementations: local_volume = self.start_online_client([User, Document]) ipc = IPCConnection() guid = ipc.post(['document'], {}) - prefix = client.api_url.value + '/document/' + guid + '/' + prefix = client.api.value + '/document/' + guid + '/' local_volume['document'].create({'guid': guid}) trigger = self.wait_for_events(ipc, event='inline', state='connecting') @@ -1561,7 +1561,7 @@ Can't find all required implementations: def test_inline(self): routes._RECONNECT_TIMEOUT = 2 - cp = ClientRoutes(Volume('client', model.RESOURCES), client.api_url.value) + cp = ClientRoutes(Volume('client', model.RESOURCES), client.api.value) assert not cp.inline() trigger = self.wait_for_events(cp, event='inline', state='online') diff --git a/tests/units/client/releases.py b/tests/units/client/releases.py index 9c99cec..30f938e 100755 --- a/tests/units/client/releases.py +++ b/tests/units/client/releases.py @@ -206,11 +206,11 @@ class Releases(tests.Test): self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) - client.api_url.value = 'fake' + client.api.value = 'fake' self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.assertEqual(solution, file(cached_path).read()) - client.api_url.value = 'http://127.0.0.1:8888' + client.api.value = 'http://127.0.0.1:8888' self.assertEqual('exit', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['event']) self.client_routes._node_mtime = cached_mtime + 2 @@ -267,7 +267,7 @@ class Releases(tests.Test): }]]) self.touch(['solutions/bu/bundle_id', solution]) - client.api_url.value = 'fake' + client.api.value = 'fake' self.assertEqual('NotFound', [i for i in conn.get(['context', 'bundle_id'], cmd='launch')][-1]['exception']) self.node.stop() diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py index 9fad249..7cd03e6 100755 --- a/tests/units/client/routes.py +++ b/tests/units/client/routes.py @@ -130,7 +130,7 @@ class RoutesTest(tests.Test): def test_SetLocalLayerInOffline(self): volume = db.Volume('client', model.RESOURCES) - cp = ClientRoutes(volume, client.api_url.value) + cp = ClientRoutes(volume, client.api.value) post = Request(method='POST', path=['context']) post.content_type = 'application/json' post.content = { @@ -153,7 +153,7 @@ class RoutesTest(tests.Test): def test_CachedClientRoutes(self): volume = db.Volume('client', model.RESOURCES, lazy_open=True) - cp = CachedClientRoutes(volume, client.api_url.value) + cp = CachedClientRoutes(volume, client.api.value) post = Request(method='POST', path=['context']) post.content_type = 'application/json' @@ -216,7 +216,7 @@ class RoutesTest(tests.Test): def test_CachedClientRoutes_WipeReports(self): volume = db.Volume('client', model.RESOURCES, lazy_open=True) - cp = CachedClientRoutes(volume, client.api_url.value) + cp = CachedClientRoutes(volume, client.api.value) post = Request(method='POST', path=['report']) post.content_type = 'application/json' @@ -236,7 +236,7 @@ class RoutesTest(tests.Test): def test_CachedClientRoutes_OpenOnlyChangedResources(self): volume = db.Volume('client', model.RESOURCES, lazy_open=True) - cp = CachedClientRoutes(volume, client.api_url.value) + cp = CachedClientRoutes(volume, client.api.value) guid = call(cp, Request(method='POST', path=['context'], content_type='application/json', content={ 'type': 'activity', 'title': 'title', @@ -247,7 +247,7 @@ class RoutesTest(tests.Test): cp.close() volume = db.Volume('client', model.RESOURCES, lazy_open=True) - cp = CachedClientRoutes(volume, client.api_url.value) + cp = CachedClientRoutes(volume, client.api.value) trigger = self.wait_for_events(cp, event='push') self.start_master() @@ -260,7 +260,7 @@ class RoutesTest(tests.Test): def test_SwitchToOfflineForAbsentOnlineProps(self): volume = db.Volume('client', model.RESOURCES) - cp = ClientRoutes(volume, client.api_url.value) + cp = ClientRoutes(volume, client.api.value) post = Request(method='POST', path=['context']) post.content_type = 'application/json' diff --git a/tests/units/db/volume.py b/tests/units/db/volume.py index 04bd9fc..3b10d7e 100755 --- a/tests/units/db/volume.py +++ b/tests/units/db/volume.py @@ -24,7 +24,7 @@ from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter from sugar_network.toolkit.router import ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http +from sugar_network.toolkit import http, ranges class VolumeTest(tests.Test): @@ -78,7 +78,7 @@ class VolumeTest(tests.Test): {'content-type': 'application/octet-stream', 'content-length': '2', 'path': 'foo/2'}, {'commit': [[1, 5]]}, ], - [dict(i) for i in volume.diff(r, ['foo'])]) + [dict(i) for i in volume.diff(r, files=['foo'])]) self.assertEqual([[6, None]], r) r = [[2, 2]] @@ -126,9 +126,55 @@ class VolumeTest(tests.Test): {'content-type': 'application/octet-stream', 'content-length': '3', 'path': 'bar/3'}, {'commit': [[7, 9]]}, ], - [dict(i) for i in volume.diff(r, ['foo', 'bar'])]) + [dict(i) for i in volume.diff(r, files=['foo', 'bar'])]) self.assertEqual([[10, None]], r) + def test_diff_SyncUsecase(self): + + class Document(db.Resource): + + @db.stored_property() + def prop1(self, value): + return value + + @db.stored_property() + def prop2(self, value): + return value + + volume = db.Volume('.', [Document]) + + volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1, 'prop1': 1, 'prop2': 1}) + self.utime('db/document/gu/guid', 1) + + # Fresh update to pull + volume['document'].update('guid', {'prop1': 2}) + self.utime('db/document/gu/guid/prop1', 2) + + # Recently pushed + volume['document'].update('guid', {'prop2': 2}) + self.utime('db/document/gu/guid/prop2', 2) + + # Exclude `prop2` ack from the pull reanges + r = [[2, None]] + ranges.exclude(r, 3, 3) + self.assertEqual([ + {'resource': 'document'}, + ], + [dict(i) for i in volume.diff(r)]) + self.assertEqual([[2, 2], [4, None]], r) + + # Pass `prop2` ack in `exclude` + r = [[2, None]] + self.assertEqual([ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'prop1': {'value': 2, 'mtime': 2}, + }}, + {'commit': [[2, 2]]}, + ], + [dict(i) for i in volume.diff(r, [[3, 3]])]) + self.assertEqual([[4, None]], r) + def test_diff_Partial(self): self.override(time, 'time', lambda: 0) @@ -649,33 +695,33 @@ class VolumeTest(tests.Test): volume1 = db.Volume('db1', [Document]) volume2 = db.Volume('db2', [Document]) - committed, patched = volume2.patch(volume1.diff([[1, None]])) + seqno, committed = volume2.patch(volume1.diff([[1, None]])) self.assertEqual([], committed) - self.assertEqual([], patched) + self.assertEqual(None, seqno) volume1['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1}) - committed, patched = volume2.patch(volume1.diff([[1, None]])) + seqno, committed = volume2.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 1]], committed) - self.assertEqual([[1, 1]], patched) - committed, patched = volume2.patch(volume1.diff([[1, None]])) + self.assertEqual(1, seqno) + seqno, committed = volume2.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 1]], committed) - self.assertEqual([], patched) + self.assertEqual(None, seqno) volume1['document'].update('1', {'prop': '1'}) - committed, patched = volume2.patch(volume1.diff([[1, None]])) + seqno, committed = volume2.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 2]], committed) - self.assertEqual([[2, 2]], patched) - committed, patched = volume2.patch(volume1.diff([[1, None]])) + self.assertEqual(2, seqno) + seqno, committed = volume2.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 2]], committed) - self.assertEqual([], patched) + self.assertEqual(None, seqno) volume3 = db.Volume('db3', [Document]) - committed, patched = volume3.patch(volume1.diff([[1, None]])) + seqno, committed = volume3.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 2]], committed) - self.assertEqual([[1, 1]], patched) - committed, patched = volume3.patch(volume1.diff([[1, None]])) + self.assertEqual(1, seqno) + seqno, committed = volume3.patch(volume1.diff([[1, None]])) self.assertEqual([[1, 2]], committed) - self.assertEqual([], patched) + self.assertEqual(None, seqno) def test_patch_CallSetters(self): @@ -725,7 +771,7 @@ class VolumeTest(tests.Test): yield i patch = generator() - self.assertEqual(([[1, 3]], [[101, 101]]), volume.patch(patch)) + self.assertEqual((101, [[1, 3]]), volume.patch(patch)) assert volume['document'].exists('1') diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py index a191c8d..5fba512 100644 --- a/tests/units/node/__main__.py +++ b/tests/units/node/__main__.py @@ -2,15 +2,11 @@ from __init__ import tests -from downloads import * -from files import * -from node import * from obs import * -from stats_user import * -from sync_master import * -from sync_offline import * -from sync_online import * from model import * +from node import * +from master import * +from slave import * if __name__ == '__main__': tests.main() diff --git a/tests/units/node/downloads.py b/tests/units/node/downloads.py deleted file mode 100755 index 29ea5d0..0000000 --- a/tests/units/node/downloads.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import json -from os.path import exists - -from __init__ import tests - -from sugar_network.node import downloads -from sugar_network.toolkit import coroutine - - -class DownloadsTest(tests.Test): - - def test_populate(self): - self.touch(('1', '1')) - self.touch(('2', '2')) - self.touch(('2.tag', json.dumps((2, {'file': 2})))) - self.touch(('3.tag', json.dumps((3, {'file': 3})))) - - pool = downloads.Pool('.') - self.assertEqual(None, pool.get(1)) - self.assertEqual(2, pool.get(2).tag['file']) - self.assertEqual('2', pool.get(2).open().read()) - self.assertEqual(None, pool.get(3)) - - def test_ComplexKeys(self): - key = {-1: None} - - self.touch(('file', 'file')) - self.touch(('file.tag', json.dumps((key, {'file': 2})))) - - pool = downloads.Pool('.') - self.assertEqual('file', pool.get(key).open().read()) - key['foo'] = 'bar' - pool.set(key, None, lambda path: file(path, 'w').close()) - self.assertNotEqual(None, pool.get(key)) - - def test_set_Tags(self): - tag = [] - - def fetch(path): - with file(path, 'w') as f: - f.write('payload') - tag.append(True) - - pool = downloads.Pool('.') - dl = pool.set('key', tag, fetch) - self.assertEqual(False, dl.ready) - self.assertEqual(None, dl.open()) - self.assertEqual([], tag) - - coroutine.dispatch() - self.assertEqual(True, dl.ready) - self.assertEqual('payload', dl.open().read()) - self.assertEqual([True], tag) - - pool2 = downloads.Pool('.') - dl2 = pool2.get('key') - self.assertEqual(True, dl2.ready) - self.assertEqual('payload', dl2.open().read()) - self.assertEqual([True], tag) - - def test_Eject(self): - downloads._POOL_SIZE = 3 - pool = downloads.Pool('.') - - pool.set(1, None, lambda path: file(path, 'w').close()) - coroutine.dispatch() - file1 = pool.get(1).open().name - pool.set(2, None, lambda path: file(path, 'w').close()) - coroutine.dispatch() - file2 = pool.get(2).open().name - pool.set(3, None, lambda path: file(path, 'w').close()) - coroutine.dispatch() - file3 = pool.get(3).open().name - - assert pool.get(1) is not None - assert exists(file1) - assert exists(file1 + '.tag') - assert pool.get(2) is not None - assert exists(file2) - assert exists(file2 + '.tag') - assert pool.get(3) is not None - assert exists(file3) - assert exists(file3 + '.tag') - - pool.set(4, None, lambda path: file(path, 'w').close()) - pool.set(5, None, lambda path: file(path, 'w').close()) - pool.set(6, None, lambda path: file(path, 'w').close()) - - assert pool.get(1) is None - assert not exists(file1) - assert not exists(file1 + '.tag') - assert pool.get(2) is None - assert not exists(file2) - assert not exists(file2 + '.tag') - assert pool.get(3) is None - assert not exists(file3) - assert not exists(file3 + '.tag') - - def test_remove(self): - pool = downloads.Pool('.') - - pool.set(1, None, lambda path: file(path, 'w').close()) - coroutine.dispatch() - file1 = pool.get(1).open().name - assert pool.get(1) is not None - assert exists(file1) - assert exists(file1 + '.tag') - - pool.remove(1) - assert pool.get(1) is None - assert not exists(file1) - assert not exists(file1 + '.tag') - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/node/files.py b/tests/units/node/files.py deleted file mode 100755 index cfacc30..0000000 --- a/tests/units/node/files.py +++ /dev/null @@ -1,357 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import time -import json -from glob import glob -from os.path import exists -from cStringIO import StringIO - -from __init__ import tests - -from sugar_network import db, toolkit -from sugar_network.node import files - - -class FilesTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - self.uuid = 0 - self.override(toolkit, 'uuid', self.next_uuid) - - def next_uuid(self): - self.uuid += 1 - return str(self.uuid) - - def test_Index_Populate(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - os.utime('files', (1, 1)) - assert seeder.sync() - - assert not seeder.sync() - in_seq = toolkit.Sequence([[1, None]]) - self.assertEqual( - [{'op': 'commit', 'sequence': []}], - [i for i in seeder.diff(in_seq)]) - self.assertEqual(0, seqno.value) - assert not exists('index') - - self.touch(('files/1', '1')) - self.touch(('files/2/3', '3')) - self.touch(('files/4/5/6', '6')) - self.utime('files', 1) - os.utime('files', (1, 1)) - - assert not seeder.sync() - in_seq = toolkit.Sequence([[1, None]]) - self.assertEqual( - [{'op': 'commit', 'sequence': []}], - [i for i in seeder.diff(in_seq)]) - self.assertEqual(0, seqno.value) - assert not exists('index') - - self.utime('files', 2) - os.utime('files', (2, 2)) - - assert seeder.sync() - in_seq = toolkit.Sequence([[1, None]]) - self.assertEqual(sorted([ - {'op': 'commit', 'sequence': [[1, 3]]}, - {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, - {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '2/3'}, - {'op': 'update', 'blob_size': 1, 'blob': '6', 'path': '4/5/6'}, - ]), - sorted(files_diff(seeder, in_seq))) - self.assertEqual(3, seqno.value) - assert exists('index') - self.assertEqual( - [[ - [1, '1', os.stat('files/1').st_mtime], - [2, '2/3', os.stat('files/2/3').st_mtime], - [3, '4/5/6', os.stat('files/4/5/6').st_mtime], - ], - os.stat('files').st_mtime], - json.load(file('index'))) - - assert not seeder.sync() - in_seq = toolkit.Sequence([[4, None]]) - self.assertEqual( - [{'op': 'commit', 'sequence': []}], - [i for i in seeder.diff(in_seq)]) - self.assertEqual(3, seqno.value) - - def test_Index_SelectiveDiff(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.touch(('files/4', '4')) - self.touch(('files/5', '5')) - self.utime('files', 1) - - in_seq = toolkit.Sequence([[2, 2], [4, 10], [20, None]]) - self.assertEqual(sorted([ - {'op': 'commit', 'sequence': [[2, 5]]}, - {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'}, - {'op': 'update', 'blob_size': 1, 'blob': '4', 'path': '4'}, - {'op': 'update', 'blob_size': 1, 'blob': '5', 'path': '5'}, - ]), - sorted(files_diff(seeder, in_seq))) - - def test_Index_PartialDiff(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - - in_seq = toolkit.Sequence([[1, None]]) - diff = seeder.diff(in_seq) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record) - self.assertEqual({'op': 'commit', 'sequence': []}, diff.throw(StopIteration)) - self.assertRaises(StopIteration, diff.next) - - diff = seeder.diff(in_seq) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'}, record) - self.assertEqual({'op': 'commit', 'sequence': [[1, 1]]}, diff.throw(StopIteration)) - self.assertRaises(StopIteration, diff.next) - - def test_Index_diff_Stretch(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - - in_seq = toolkit.Sequence([[1, 1], [3, None]]) - diff = seeder.diff(in_seq) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, record) - self.assertEqual({'op': 'commit', 'sequence': [[1, 3]]}, next(diff)) - self.assertRaises(StopIteration, diff.next) - - def test_Index_diff_DoNotStretchContinuesPacket(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - - in_seq = toolkit.Sequence([[1, 1], [3, None]]) - diff = seeder.diff(in_seq, toolkit.Sequence([[1, 1]])) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, record) - record = next(diff) - record['blob'] = ''.join([i for i in record['blob']]) - self.assertEqual({'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, record) - self.assertEqual({'op': 'commit', 'sequence': [[1, 1], [3, 3]]}, next(diff)) - self.assertRaises(StopIteration, diff.next) - - def test_Index_DiffUpdatedFiles(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - os.utime('files', (1, 1)) - - for __ in seeder.diff(toolkit.Sequence([[1, None]])): - pass - self.assertEqual(3, seqno.value) - - os.utime('files/2', (2, 2)) - - self.assertEqual( - [{'op': 'commit', 'sequence': []}], - [i for i in seeder.diff(toolkit.Sequence([[4, None]]))]) - self.assertEqual(3, seqno.value) - - os.utime('files', (3, 3)) - - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'}, - {'op': 'commit', 'sequence': [[4, 4]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[4, None]])))) - self.assertEqual(4, seqno.value) - - os.utime('files/1', (4, 4)) - os.utime('files/3', (4, 4)) - os.utime('files', (4, 4)) - - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, - {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, - {'op': 'commit', 'sequence': [[5, 6]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[5, None]])))) - self.assertEqual(6, seqno.value) - - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, - {'op': 'update', 'blob_size': 1, 'blob': '2', 'path': '2'}, - {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, - {'op': 'commit', 'sequence': [[1, 6]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[1, None]])))) - self.assertEqual(6, seqno.value) - - def test_Index_DiffCreatedFiles(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - os.utime('files', (1, 1)) - - for __ in seeder.diff(toolkit.Sequence([[1, None]])): - pass - self.assertEqual(3, seqno.value) - - self.touch(('files/4', '4')) - os.utime('files/4', (2, 2)) - os.utime('files', (1, 1)) - - self.assertEqual( - [{'op': 'commit', 'sequence': []}], - [i for i in seeder.diff(toolkit.Sequence([[4, None]]))]) - self.assertEqual(3, seqno.value) - - os.utime('files/4', (2, 2)) - os.utime('files', (2, 2)) - - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '4', 'path': '4'}, - {'op': 'commit', 'sequence': [[4, 4]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[4, None]])))) - self.assertEqual(4, seqno.value) - - self.touch(('files/5', '5')) - os.utime('files/5', (3, 3)) - self.touch(('files/6', '6')) - os.utime('files/6', (3, 3)) - os.utime('files', (3, 3)) - - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '5', 'path': '5'}, - {'op': 'update', 'blob_size': 1, 'blob': '6', 'path': '6'}, - {'op': 'commit', 'sequence': [[5, 6]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[5, None]])))) - self.assertEqual(6, seqno.value) - - def test_Index_DiffDeletedFiles(self): - seqno = toolkit.Seqno(tests.tmpdir + '/seqno') - seeder = files.Index(tests.tmpdir + '/files', tests.tmpdir + '/index', seqno) - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - os.utime('files', (1, 1)) - - for __ in seeder.diff(toolkit.Sequence([[1, None]])): - pass - self.assertEqual(3, seqno.value) - - os.unlink('files/2') - os.utime('files', (2, 2)) - - assert seeder.sync() - self.assertEqual(sorted([ - {'op': 'update', 'blob_size': 1, 'blob': '1', 'path': '1'}, - {'op': 'update', 'blob_size': 1, 'blob': '3', 'path': '3'}, - {'op': 'delete', 'path': '2'}, - {'op': 'commit', 'sequence': [[1, 4]]}, - ]), - sorted(files_diff(seeder, toolkit.Sequence([[1, None]])))) - self.assertEqual(4, seqno.value) - - os.unlink('files/1') - os.unlink('files/3') - os.utime('files', (3, 3)) - - assert seeder.sync() - self.assertEqual(sorted([ - {'op': 'delete', 'path': '1'}, - {'op': 'delete', 'path': '2'}, - {'op': 'delete', 'path': '3'}, - {'op': 'commit', 'sequence': [[1, 6]]}, - ]), - sorted([i for i in seeder.diff(toolkit.Sequence([[1, None]]))])) - self.assertEqual(6, seqno.value) - - assert not seeder.sync() - self.assertEqual(sorted([ - {'op': 'delete', 'path': '1'}, - {'op': 'delete', 'path': '2'}, - {'op': 'delete', 'path': '3'}, - {'op': 'commit', 'sequence': [[1, 6]]}, - ]), - sorted([i for i in seeder.diff(toolkit.Sequence([[1, None]]))])) - self.assertEqual(6, seqno.value) - - def test_merge_Updated(self): - self.assertEqual('commit-sequence', files.merge('dst', [ - {'op': 'update', 'path': '1', 'blob': StringIO('1')}, - {'op': 'update', 'path': '2/2', 'blob': StringIO('22')}, - {'op': 'update', 'path': '3/3/3', 'blob': StringIO('333')}, - {'op': 'commit', 'sequence': 'commit-sequence'}, - ])) - self.assertEqual('1', file('dst/1').read()) - self.assertEqual('22', file('dst/2/2').read()) - self.assertEqual('333', file('dst/3/3/3').read()) - - def test_merge_Deleted(self): - self.touch('dst/1') - self.touch('dst/2/2') - - self.assertEqual('commit-sequence', files.merge('dst', [ - {'op': 'delete', 'path': '1'}, - {'op': 'delete', 'path': '2/2'}, - {'op': 'delete', 'path': '3/3/3'}, - {'op': 'commit', 'sequence': 'commit-sequence'}, - ])) - assert not exists('dst/1') - assert not exists('dst/2/2') - assert not exists('dst/3/3/3') - - -def files_diff(index, in_seq, out_seq=None, **kwargs): - for record in index.diff(in_seq, out_seq=out_seq, **kwargs): - if 'blob' in record: - record['blob'] = ''.join([i for i in record['blob']]) - yield record - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/node/master.py b/tests/units/node/master.py new file mode 100755 index 0000000..69fc6a7 --- /dev/null +++ b/tests/units/node/master.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import gzip +import time +import json +import base64 +import hashlib +from glob import glob +from os.path import join, exists, basename +from StringIO import StringIO +from base64 import b64decode, b64encode + +import rrdtool + +from __init__ import tests + +from sugar_network.client import Connection, keyfile, api +from sugar_network.db.directory import Directory +from sugar_network import db, node, toolkit +from sugar_network.node.master import MasterRoutes +from sugar_network.db.volume import Volume +from sugar_network.model.user import User +from sugar_network.toolkit.router import Response, File +from sugar_network.toolkit import coroutine, parcel, http + + +class MasterTest(tests.Test): + + def setUp(self): + tests.Test.setUp(self) + + def next_uuid(): + self.uuid += 1 + return self.uuid + + self.uuid = 0 + self.override(toolkit, 'uuid', next_uuid) + + def test_push(self): + + class Document(db.Resource): + pass + + volume = self.start_master([Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + self.touch(('blob1', '1')) + self.touch(('blob2', '2')) + + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + File('./blob1', meta={'content-length': '1'}), + File('./blob2', meta={'content-length': '1', 'path': 'foo/bar'}), + {'commit': [[1, 3]]}, + ]), + ], header={'to': self.node_routes.guid, 'from': 'slave'})) + response = conn.request('POST', [], patch, params={'cmd': 'push'}) + reply = parcel.decode(response.raw) + + assert volume['document'].exists('1') + blob = volume.blobs.get(hashlib.sha1('1').hexdigest()) + self.assertEqual('1', ''.join(blob.iter_content())) + blob = volume.blobs.get('foo/bar') + self.assertEqual('2', ''.join(blob.iter_content())) + + self.assertEqual({ + 'packet': 'ack', + 'from': self.node_routes.guid, + 'to': 'slave', + 'ack': [[1, 1]], + 'ranges': [[1, 3]], + }, + next(reply).header) + self.assertRaises(StopIteration, next, reply) + + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'ack': { + 'slave': [[[[1, 3]], [[1, 1]]]], + }, + })), + response.headers['set-cookie']) + + def test_push_MisaddressedPackets(self): + + class Document(db.Resource): + pass + + volume = self.start_master([Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ]), + ], header={'from': 'slave'})) + self.assertRaises(http.BadRequest, conn.request, 'POST', [], patch, params={'cmd': 'push'}) + + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ]), + ], header={'to': 'fake', 'from': 'slave'})) + self.assertRaises(http.BadRequest, conn.request, 'POST', [], patch, params={'cmd': 'push'}) + + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ]), + ], header={'to': '127.0.0.1:7777', 'from': 'slave'})) + conn.request('POST', [], patch, params={'cmd': 'push'}) + + def test_push_WithCookies(self): + + class Document(db.Resource): + pass + + volume = self.start_master([Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + self.touch(('blob', 'blob')) + + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + File('./blob', meta={'content-length': str(len('blob'))}), + {'commit': [[1, 2]]}, + ]), + ], header={'to': self.node_routes.guid, 'from': 'slave'})) + response = conn.request('POST', [], patch, params={'cmd': 'push'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'ack': { + 'slave': [[[[100, 100]], [[200, 200]]]], + }, + })), + }) + reply = parcel.decode(response.raw) + + assert volume['document'].exists('1') + blob_digest = hashlib.sha1('blob').hexdigest() + blob = volume.blobs.get(blob_digest) + self.assertEqual('blob', ''.join(blob.iter_content())) + + self.assertEqual({ + 'packet': 'ack', + 'from': self.node_routes.guid, + 'to': 'slave', + 'ack': [[1, 1]], + 'ranges': [[1, 2]], + }, + next(reply).header) + self.assertRaises(StopIteration, next, reply) + + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'ack': { + 'slave': [[[[100, 100]], [[200, 200]]], [[[1, 2]], [[1, 1]]]], + }, + })), + response.headers['set-cookie']) + + def test_push_Forward(self): + + class Document(db.Resource): + pass + + volume = self.start_master([Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + self.touch(('blob', 'blob')) + + patch = ''.join(parcel.encode([ + ('pull', {'ranges': [[1, None]]}, []), + ('request', {'for': 1}, []), + ], header={'to': self.node_routes.guid, 'from': 'slave'})) + response = conn.request('POST', [], patch, params={'cmd': 'push'}) + reply = parcel.decode(response.raw) + self.assertRaises(StopIteration, next, reply) + + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': {'slave': []}, + 'request': [ + {'to': '127.0.0.1:7777', 'from': 'slave', 'packet': 'request', 'for': 1}, + ], + })), + response.headers['set-cookie']) + + def test_pull(self): + + class Document(db.Resource): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1}) + self.utime('master/db/document/gu/guid', 1) + blob = volume.blobs.post('a') + self.touch(('master/files/foo/bar', 'bb')) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'ctime': {'mtime': 1, 'value': 1}, + 'guid': {'mtime': 1, 'value': 'guid'}, + 'mtime': {'mtime': 1, 'value': 1}, + }}, + {'content-length': '1', 'content-type': 'application/octet-stream'}, + {'content-length': '2', 'content-type': 'application/octet-stream', 'path': 'foo/bar'}, + {'commit': [[1, 3]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'id': 1, + 'pull': [[1, None]], + })), + response.headers['set-cookie']) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={'cookie': response.headers['set-cookie']}) + assert not response.raw.read() + self.assertEqual( + 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly', + response.headers['set-cookie']) + + def test_pull_ExcludeAcks(self): + + class Document(db.Resource): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1}) + self.utime('master/db/document/1/1', 1) + blob = volume.blobs.post('blob') + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': { + 'node': [[[[0, 0]], [[1, 1]]], [[[0, 0]], [[2, 2]]]], + }, + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]), + ], + [(packet.header, [record for record in packet]) for packet in parcel.decode(response.raw)]) + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'id': 1, + 'pull': [[1, None]], + 'ack': { + 'node': [[[[0, 0]], [[1, 1]]], [[[0, 0]], [[2, 2]]]], + }, + })), + response.headers['set-cookie']) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={'cookie': response.headers['set-cookie']}) + assert not response.raw.read() + self.assertEqual( + 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly', + response.headers['set-cookie']) + + def test_pull_ExcludeOnlyAcksIntersection(self): + + class Document(db.Resource): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1}) + self.utime('master/db/document/1/1', 1) + blob1 = volume.blobs.post('a') + volume['document'].create({'guid': '2', 'ctime': 2, 'mtime': 2}) + self.utime('master/db/document/2/2', 2) + blob2 = volume.blobs.post('bb') + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': { + 'node1': [[[[0, 0]], [[1, 4]]]], + 'node2': [[[[0, 0]], [[1, 4]]]], + }, + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': { + 'node1': [[[[0, 0]], [[1, 4]]]], + 'node2': [[[[0, 0]], [[2, 4]]]], + }, + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'mtime': 1, 'value': '1'}, + 'ctime': {'mtime': 1, 'value': 1}, + 'mtime': {'mtime': 1, 'value': 1}, + }}, + {'commit': [[1, 1]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': { + 'node1': [[[[0, 0]], [[1, 4]]]], + 'node2': [[[[0, 0]], [[1, 3]]]], + }, + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'content-length': '2', 'content-type': 'application/octet-stream'}, + {'commit': [[4, 4]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + + def test_pull_ExcludeAckRequests(self): + + class Document(db.Resource): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1}) + self.utime('master/db/document/1/1', 1) + blob = volume.blobs.post('blob') + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + 'ack': { + 'node1': [[[[0, 0]], [[1, 2]]]], + 'node2': [], + }, + 'request': [ + {'from': 'node2', 'origin': 'node1', 'ranges': [[0, 0]]}, + ], + })) + }) + reply = parcel.decode(response.raw) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'to': 'node2', 'packet': 'ack', 'ack': [[1, 2]]}, []), + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [{'resource': 'document'}]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in reply]) + + def test_pull_Limitted(self): + RECORD = 1024 * 1024 + + class Document(db.Resource): + + @db.stored_property() + def prop(self): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': '1', 'ctime': 1, 'mtime': 1, 'prop': '.' * RECORD}) + self.utime('master/db/document/1/1', 1) + volume['document'].create({'guid': '2', 'ctime': 2, 'mtime': 2, 'prop': '.' * RECORD}) + self.utime('master/db/document/2/2', 2) + volume['document'].create({'guid': '3', 'ctime': 3, 'mtime': 3, 'prop': '.' * RECORD}) + self.utime('master/db/document/3/3', 3) + + response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * .5)}, headers={ + 'cookie': 'sugar_network_node=%s' % b64encode(json.dumps({ + 'pull': [[1, None]], + })) + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'id': 1, + 'pull': [[1, None]], + })), + response.headers['set-cookie']) + + response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * 1.5)}, headers={ + 'cookie': response.headers['set-cookie'], + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'ctime': {'mtime': 1, 'value': 1}, + 'guid': {'mtime': 1, 'value': '1'}, + 'mtime': {'mtime': 1, 'value': 1}, + 'prop': {'mtime': 1, 'value': '.' * RECORD}, + }}, + {'commit': [[1, 1]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'id': 1, + 'pull': [[1, None]], + })), + response.headers['set-cookie']) + + response = conn.request('GET', [], params={'cmd': 'pull', 'accept_length': int(RECORD * 2.5)}, headers={ + 'cookie': response.headers['set-cookie'], + }) + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'guid': '2', 'patch': { + 'ctime': {'mtime': 2, 'value': 2}, + 'guid': {'mtime': 2, 'value': '2'}, + 'mtime': {'mtime': 2, 'value': 2}, + 'prop': {'mtime': 2, 'value': '.' * RECORD}, + }}, + {'guid': '3', 'patch': { + 'ctime': {'mtime': 3, 'value': 3}, + 'guid': {'mtime': 3, 'value': '3'}, + 'mtime': {'mtime': 3, 'value': 3}, + 'prop': {'mtime': 3, 'value': '.' * RECORD}, + }}, + {'commit': [[2, 3]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + self.assertEqual( + 'sugar_network_node=%s; Max-Age=3600; HttpOnly' % b64encode(json.dumps({ + 'id': 1, + 'pull': [[2, None]], + })), + response.headers['set-cookie']) + + response = conn.request('GET', [], params={'cmd': 'pull'}, headers={ + 'cookie': response.headers['set-cookie'], + }) + assert not response.raw.read() + self.assertEqual( + 'sugar_network_node=unset_sugar_network_node; Max-Age=3600; HttpOnly', + response.headers['set-cookie']) + + def test_sync(self): + + class Document(db.Resource): + pass + + volume = self.start_master([User, Document]) + conn = Connection(auth=http.SugarAuth(keyfile.value)) + + volume['document'].create({'guid': 'guid', 'ctime': 1, 'mtime': 1}) + self.utime('master/db/document/gu/guid', 1) + blob1 = volume.blobs.post('1') + + self.touch(('blob2', 'ccc')) + patch = ''.join(parcel.encode([ + ('push', None, [ + {'resource': 'document'}, + {'guid': '2', 'patch': { + 'guid': {'value': '2', 'mtime': 2}, + 'ctime': {'value': 2, 'mtime': 2}, + 'mtime': {'value': 2, 'mtime': 2}, + }}, + File('./blob2', meta={'content-length': '3'}), + {'commit': [[1, 2]]}, + ]), + ('pull', {'ranges': [[1, None]]}, []), + ], header={'to': '127.0.0.1:7777', 'from': 'node'})) + response = conn.request('POST', [], patch, params={'cmd': 'sync'}) + blob2 = volume.blobs.get(hashlib.sha1('ccc').hexdigest()) + + self.assertEqual([ + ({'from': '127.0.0.1:7777', 'to': 'node', 'packet': 'ack', 'ack': [[3, 3]], 'ranges': [[1, 2]]}, []), + ({'from': '127.0.0.1:7777', 'packet': 'push'}, [ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'ctime': {'mtime': 1, 'value': 1}, + 'guid': {'mtime': 1, 'value': 'guid'}, + 'mtime': {'mtime': 1, 'value': 1}, + }}, + {'content-length': '1', 'content-type': 'application/octet-stream'}, + {'commit': [[1, 2]]}, + ]), + ], + [(packet.header, [dict(record) for record in packet]) for packet in parcel.decode(response.raw)]) + + assert volume['document'].exists('2') + self.assertEqual('ccc', ''.join(blob2.iter_content())) + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/node/model.py b/tests/units/node/model.py index 12b5a21..36937dc 100755 --- a/tests/units/node/model.py +++ b/tests/units/node/model.py @@ -7,7 +7,7 @@ import time from __init__ import tests from sugar_network import db, toolkit -from sugar_network.client import Connection, keyfile, api_url +from sugar_network.client import Connection, keyfile, api from sugar_network.model.user import User from sugar_network.model.post import Post from sugar_network.model.context import Context @@ -42,404 +42,6 @@ class ModelTest(tests.Test): ], [i for i in events if i['event'] == 'release']) self.assertEqual(1, volume.releases_seqno.value) - def test_diff(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = self.start_master([User, Document]) - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - guid1 = conn.post(['document'], {'prop': 'a'}) - self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1) - guid2 = conn.post(['document'], {'prop': 'b'}) - self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2) - - in_seq = toolkit.Sequence([[1, None]]) - self.assertEqual([ - {'resource': 'document'}, - {'guid': guid1, - 'diff': { - 'guid': {'value': guid1, 'mtime': 1}, - 'mtime': {'value': 0, 'mtime': 1}, - 'ctime': {'value': 0, 'mtime': 1}, - 'prop': {'value': 'a', 'mtime': 1}, - 'author': {'mtime': 1, 'value': {}}, - 'layer': {'mtime': 1, 'value': []}, - 'tags': {'mtime': 1, 'value': []}, - }, - }, - {'guid': guid2, - 'diff': { - 'guid': {'value': guid2, 'mtime': 2}, - 'mtime': {'value': 0, 'mtime': 2}, - 'ctime': {'value': 0, 'mtime': 2}, - 'prop': {'value': 'b', 'mtime': 2}, - 'author': {'mtime': 2, 'value': {}}, - 'layer': {'mtime': 2, 'value': []}, - 'tags': {'mtime': 2, 'value': []}, - }, - }, - {'commit': [[1, 2]]}, - ], - [i for i in model.diff(volume, in_seq)]) - self.assertEqual([[1, None]], in_seq) - - def test_diff_Partial(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = self.start_master([User, Document]) - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - guid1 = conn.post(['document'], {'prop': 'a'}) - self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1) - guid2 = conn.post(['document'], {'prop': 'b'}) - self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2) - - in_seq = toolkit.Sequence([[1, None]]) - patch = model.diff(volume, in_seq) - self.assertEqual({'resource': 'document'}, next(patch)) - self.assertEqual(guid1, next(patch)['guid']) - self.assertEqual({'commit': []}, patch.throw(StopIteration())) - try: - next(patch) - assert False - except StopIteration: - pass - - patch = model.diff(volume, in_seq) - self.assertEqual({'resource': 'document'}, next(patch)) - self.assertEqual(guid1, next(patch)['guid']) - self.assertEqual(guid2, next(patch)['guid']) - self.assertEqual({'commit': [[1, 1]]}, patch.throw(StopIteration())) - try: - next(patch) - assert False - except StopIteration: - pass - - def test_diff_Stretch(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = self.start_master([User, Document]) - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - guid1 = conn.post(['document'], {'prop': 'a'}) - self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1) - guid2 = conn.post(['document'], {'prop': 'b'}) - volume['document'].delete(guid2) - guid3 = conn.post(['document'], {'prop': 'c'}) - self.utime('master/document/%s/%s' % (guid3[:2], guid3), 2) - guid4 = conn.post(['document'], {'prop': 'd'}) - volume['document'].delete(guid4) - guid5 = conn.post(['document'], {'prop': 'f'}) - self.utime('master/document/%s/%s' % (guid5[:2], guid5), 2) - - in_seq = toolkit.Sequence([[1, None]]) - patch = model.diff(volume, in_seq) - self.assertEqual({'resource': 'document'}, patch.send(None)) - self.assertEqual(guid1, patch.send(None)['guid']) - self.assertEqual(guid3, patch.send(None)['guid']) - self.assertEqual(guid5, patch.send(None)['guid']) - self.assertEqual({'commit': [[1, 1], [3, 3]]}, patch.throw(StopIteration())) - try: - patch.send(None) - assert False - except StopIteration: - pass - - patch = model.diff(volume, in_seq) - self.assertEqual({'resource': 'document'}, patch.send(None)) - self.assertEqual(guid1, patch.send(None)['guid']) - self.assertEqual(guid3, patch.send(None)['guid']) - self.assertEqual(guid5, patch.send(None)['guid']) - self.assertEqual({'commit': [[1, 5]]}, patch.send(None)) - try: - patch.send(None) - assert False - except StopIteration: - pass - - def test_diff_DoNotStretchContinuesPacket(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = self.start_master([User, Document]) - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - guid1 = conn.post(['document'], {'prop': 'a'}) - volume['document'].delete(guid1) - guid2 = conn.post(['document'], {'prop': 'b'}) - volume['document'].delete(guid2) - guid3 = conn.post(['document'], {'prop': 'c'}) - self.utime('master/document/%s/%s' % (guid3[:2], guid3), 2) - guid4 = conn.post(['document'], {'prop': 'd'}) - volume['document'].delete(guid4) - guid5 = conn.post(['document'], {'prop': 'f'}) - self.utime('master/document/%s/%s' % (guid5[:2], guid5), 2) - - in_seq = toolkit.Sequence([[1, None]]) - patch = model.diff(volume, in_seq, toolkit.Sequence([[1, 1]])) - self.assertEqual({'resource': 'document'}, patch.send(None)) - self.assertEqual(guid3, patch.send(None)['guid']) - self.assertEqual(guid5, patch.send(None)['guid']) - self.assertEqual({'commit': [[1, 1], [3, 3], [5, 5]]}, patch.send(None)) - try: - patch.send(None) - assert False - except StopIteration: - pass - - def test_diff_TheSameInSeqForAllDocuments(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document1(db.Resource): - pass - - class Document2(db.Resource): - pass - - class Document3(db.Resource): - pass - - volume = self.start_master([User, Document1, Document2, Document3]) - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - guid3 = conn.post(['document1'], {}) - self.utime('master/document/%s/%s' % (guid3[:2], guid3), 3) - guid2 = conn.post(['document2'], {}) - self.utime('master/document/%s/%s' % (guid2[:2], guid2), 2) - guid1 = conn.post(['document3'], {}) - self.utime('master/document/%s/%s' % (guid1[:2], guid1), 1) - - in_seq = toolkit.Sequence([[1, None]]) - patch = model.diff(volume, in_seq) - self.assertEqual({'resource': 'document1'}, patch.send(None)) - self.assertEqual(guid3, patch.send(None)['guid']) - self.assertEqual({'resource': 'document2'}, patch.send(None)) - self.assertEqual(guid2, patch.send(None)['guid']) - self.assertEqual({'resource': 'document3'}, patch.send(None)) - self.assertEqual(guid1, patch.send(None)['guid']) - self.assertEqual({'commit': [[1, 3]]}, patch.send(None)) - try: - patch.send(None) - assert False - except StopIteration: - pass - - def test_merge_Create(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document1(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - class Document2(db.Resource): - pass - - self.touch(('master/db.seqno', '100')) - volume = self.start_master([Document1, Document2]) - - records = [ - {'resource': 'document1'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'ctime': {'value': 2, 'mtime': 2.0}, - 'mtime': {'value': 3, 'mtime': 3.0}, - 'prop': {'value': '4', 'mtime': 4.0}, - }}, - {'resource': 'document2'}, - {'guid': '5', 'diff': { - 'guid': {'value': '5', 'mtime': 5.0}, - 'ctime': {'value': 6, 'mtime': 6.0}, - 'mtime': {'value': 7, 'mtime': 7.0}, - }}, - {'commit': [[1, 2]]}, - ] - self.assertEqual(([[1, 2]], [[101, 102]]), model.merge(volume, records)) - - self.assertEqual( - {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3}, - volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(1, os.stat('master/document1/1/1/guid').st_mtime) - self.assertEqual(2, os.stat('master/document1/1/1/ctime').st_mtime) - self.assertEqual(3, os.stat('master/document1/1/1/mtime').st_mtime) - self.assertEqual(4, os.stat('master/document1/1/1/prop').st_mtime) - - self.assertEqual( - {'guid': '5', 'ctime': 6, 'mtime': 7}, - volume['document2'].get('5').properties(['guid', 'ctime', 'mtime'])) - self.assertEqual(5, os.stat('master/document2/5/5/guid').st_mtime) - self.assertEqual(6, os.stat('master/document2/5/5/ctime').st_mtime) - self.assertEqual(7, os.stat('master/document2/5/5/mtime').st_mtime) - - def test_merge_Update(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - self.touch(('master/db.seqno', '100')) - volume = db.Volume('master', [Document]) - volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}) - for i in os.listdir('master/document/1/1'): - os.utime('master/document/1/1/%s' % i, (2, 2)) - - records = [ - {'resource': 'document'}, - {'guid': '1', 'diff': {'prop': {'value': '2', 'mtime': 1.0}}}, - {'commit': [[1, 1]]}, - ] - self.assertEqual(([[1, 1]], []), model.merge(volume, records)) - self.assertEqual( - {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(2, os.stat('master/document/1/1/prop').st_mtime) - - records = [ - {'resource': 'document'}, - {'guid': '1', 'diff': {'prop': {'value': '3', 'mtime': 2.0}}}, - {'commit': [[2, 2]]}, - ] - self.assertEqual(([[2, 2]], []), model.merge(volume, records)) - self.assertEqual( - {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(2, os.stat('master/document/1/1/prop').st_mtime) - - records = [ - {'resource': 'document'}, - {'guid': '1', 'diff': {'prop': {'value': '4', 'mtime': 3.0}}}, - {'commit': [[3, 3]]}, - ] - self.assertEqual(([[3, 3]], [[102, 102]]), model.merge(volume, records)) - self.assertEqual( - {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(3, os.stat('master/document/1/1/prop').st_mtime) - - def test_merge_MultipleCommits(self): - self.override(time, 'time', lambda: 0) - - class Document(db.Resource): - - @db.stored_property() - def prop(self, value): - return value - - self.touch(('master/db.seqno', '100')) - volume = db.Volume('master', [Document]) - - def generator(): - for i in [ - {'resource': 'document'}, - {'commit': [[1, 1]]}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'ctime': {'value': 2, 'mtime': 2.0}, - 'mtime': {'value': 3, 'mtime': 3.0}, - 'prop': {'value': '4', 'mtime': 4.0}, - }}, - {'commit': [[2, 3]]}, - ]: - yield i - - records = generator() - self.assertEqual(([[1, 3]], [[101, 101]]), model.merge(volume, records)) - assert volume['document'].exists('1') - - def test_diff_ByLayers(self): - self.override(time, 'time', lambda: 0) - self.override(NodeRoutes, 'authorize', lambda self, user, role: True) - - class Context(db.Resource): - pass - - class Post(db.Resource): - pass - - this.request = Request() - volume = db.Volume('db', [Context, Post]) - volume['context'].create({'guid': '0', 'ctime': 1, 'mtime': 1, 'layer': ['layer0', 'common']}) - volume['context'].create({'guid': '1', 'ctime': 1, 'mtime': 1, 'layer': ['layer1']}) - volume['post'].create({'guid': '3', 'ctime': 3, 'mtime': 3, 'layer': 'layer3'}) - - volume['context'].update('0', {'tags': '0'}) - volume['context'].update('1', {'tags': '1'}) - volume['post'].update('3', {'tags': '3'}) - self.utime('db', 0) - - self.assertEqual(sorted([ - {'resource': 'context'}, - {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}}, - {'guid': '1', 'diff': {'tags': {'value': '1', 'mtime': 0}}}, - {'resource': 'post'}, - {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}}, - {'commit': [[4, 6]]}, - ]), - sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]))])) - - self.assertEqual(sorted([ - {'resource': 'context'}, - {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}}, - {'guid': '1', 'diff': {'tags': {'value': '1', 'mtime': 0}}}, - {'resource': 'post'}, - {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}}, - {'commit': [[4, 6]]}, - ]), - sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='layer1')])) - - self.assertEqual(sorted([ - {'resource': 'context'}, - {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}}, - {'resource': 'post'}, - {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}}, - {'commit': [[4, 6]]}, - ]), - sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='layer2')])) - - self.assertEqual(sorted([ - {'resource': 'context'}, - {'guid': '0', 'diff': {'tags': {'value': '0', 'mtime': 0}}}, - {'resource': 'post'}, - {'guid': '3', 'diff': {'tags': {'value': '3', 'mtime': 0}}}, - {'commit': [[4, 6]]}, - ]), - sorted([i for i in model.diff(volume, toolkit.Sequence([[4, None]]), layer='foo')])) - def test_Packages(self): self.override(obs, 'get_repos', lambda: [ {'lsb_id': 'Gentoo', 'lsb_release': '2.1', 'name': 'Gentoo-2.1', 'arches': ['x86', 'x86_64']}, @@ -449,7 +51,7 @@ class ModelTest(tests.Test): self.override(obs, 'resolve', lambda repo, arch, names: {'version': '1.0'}) volume = self.start_master([User, model.Context]) - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) guid = conn.post(['context'], { 'type': 'package', @@ -463,7 +65,7 @@ class ModelTest(tests.Test): }) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']}, }, @@ -487,7 +89,7 @@ class ModelTest(tests.Test): }) self.assertEqual({ 'Gentoo': { - 'seqno': 5, + 'seqno': 6, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']}, }, @@ -509,7 +111,7 @@ class ModelTest(tests.Test): }) self.assertEqual({ 'Debian-6.0': { - 'seqno': 7, + 'seqno': 8, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']}, }, @@ -526,7 +128,7 @@ class ModelTest(tests.Test): self.override(obs, 'resolve', lambda repo, arch, names: enforce(False, 'resolve failed')) volume = self.start_master([User, model.Context]) - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) guid = conn.post(['context'], { 'type': 'package', @@ -540,7 +142,7 @@ class ModelTest(tests.Test): }) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['pkg1.bin', 'pkg2.bin'], 'devel': ['pkg3.devel']}, }, @@ -558,7 +160,7 @@ class ModelTest(tests.Test): ]) volume = self.start_master([User, model.Context]) - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) guid = conn.post(['context'], { 'type': 'package', 'title': 'title', @@ -570,7 +172,7 @@ class ModelTest(tests.Test): conn.put(['context', guid, 'releases', '*'], {'binary': '1'}) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['1']}, }, @@ -586,12 +188,12 @@ class ModelTest(tests.Test): conn.put(['context', guid, 'releases', 'Debian'], {'binary': '2'}) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['1']}, }, 'Debian': { - 'seqno': 4, + 'seqno': 5, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['2']}, }, @@ -607,17 +209,17 @@ class ModelTest(tests.Test): conn.put(['context', guid, 'releases', 'Debian-6.0'], {'binary': '3'}) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['1']}, }, 'Debian': { - 'seqno': 4, + 'seqno': 5, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['2']}, }, 'Debian-6.0': { - 'seqno': 5, + 'seqno': 6, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['3']}, }, @@ -633,17 +235,17 @@ class ModelTest(tests.Test): conn.put(['context', guid, 'releases', 'Debian'], {'binary': '4'}) self.assertEqual({ '*': { - 'seqno': 3, + 'seqno': 4, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['1']}, }, 'Debian': { - 'seqno': 6, + 'seqno': 7, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['4']}, }, 'Debian-6.0': { - 'seqno': 5, + 'seqno': 6, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': {'binary': ['3']}, }, diff --git a/tests/units/node/node.py b/tests/units/node/node.py index 82d4e43..36c6285 100755 --- a/tests/units/node/node.py +++ b/tests/units/node/node.py @@ -16,10 +16,8 @@ from os.path import exists, join from __init__ import tests from sugar_network import db, node, model, client -from sugar_network.client import Connection, keyfile, api_url +from sugar_network.client import Connection, keyfile, api from sugar_network.toolkit import http, coroutine -from sugar_network.toolkit.rrd import Rrd -from sugar_network.node import stats_user from sugar_network.node.routes import NodeRoutes from sugar_network.node.master import MasterRoutes from sugar_network.model.user import User @@ -32,74 +30,6 @@ from sugar_network.toolkit import http class NodeTest(tests.Test): - def setUp(self): - tests.Test.setUp(self) - node.stats_root.value = 'stats' - stats_user.stats_user_step.value = 1 - stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100'] - - def test_UserStats(self): - volume = self.start_master() - conn = Connection(auth=http.SugarAuth(keyfile.value)) - - this.call(method='POST', path=['user'], principal=tests.UID, content={ - 'name': 'user', - 'pubkey': tests.PUBKEY, - }) - - ts = int(time.time()) - - self.assertEqual({ - 'enable': True, - 'status': {}, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'step': stats_user.stats_user_step.value, - }, - this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID)) - - this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={ - 'name': 'test', - 'values': [(ts + 1, {'field': '1'})], - }) - - self.assertEqual({ - 'enable': True, 'status': { - 'test': ts + 2, - }, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'step': stats_user.stats_user_step.value, - }, - this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID)) - - this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={ - 'name': 'test', - 'values': [(ts + 2, {'field': '2'})], - }) - - self.assertEqual({ - 'enable': True, 'status': { - 'test': ts + 3, - }, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'step': stats_user.stats_user_step.value, - }, - this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID)) - - this.call(method='POST', cmd='stats-upload', path=['user', tests.UID], principal=tests.UID, content={ - 'name': 'test2', - 'values': [(ts + 3, {'field': '3'})], - }) - - self.assertEqual({ - 'enable': True, 'status': { - 'test': ts + 3, - 'test2': ts + 4, - }, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'step': stats_user.stats_user_step.value, - }, - this.call(method='GET', cmd='stats-info', path=['user', tests.UID], principal=tests.UID)) - def test_HandleDeletes(self): volume = self.start_master() conn = Connection(auth=http.SugarAuth(keyfile.value)) @@ -111,7 +41,7 @@ class NodeTest(tests.Test): 'summary': 'summary', 'description': 'description', }) - guid_path = 'master/context/%s/%s' % (guid[:2], guid) + guid_path = 'master/db/context/%s/%s' % (guid[:2], guid) assert exists(guid_path) self.assertEqual({ @@ -181,6 +111,9 @@ class NodeTest(tests.Test): class Routes(NodeRoutes): + def __init__(self, **kwargs): + NodeRoutes.__init__(self, 'node', **kwargs) + @route('GET', [None, None], cmd='probe1', acl=ACL.AUTH) def probe1(self, directory): pass @@ -208,6 +141,9 @@ class NodeTest(tests.Test): class Routes(NodeRoutes): + def __init__(self, **kwargs): + NodeRoutes.__init__(self, 'node', **kwargs) + @route('GET', [None, None], cmd='probe1', acl=ACL.AUTHOR) def probe1(self): pass @@ -260,6 +196,9 @@ class NodeTest(tests.Test): class Routes(NodeRoutes): + def __init__(self, **kwargs): + NodeRoutes.__init__(self, 'node', **kwargs) + @route('PROBE', acl=ACL.SUPERUSER) def probe(self): return 'ok' @@ -319,6 +258,9 @@ class NodeTest(tests.Test): class Routes(NodeRoutes): + def __init__(self, **kwargs): + NodeRoutes.__init__(self, 'node', **kwargs) + @route('PROBE', acl=ACL.SUPERUSER) def probe(self): pass @@ -338,6 +280,9 @@ class NodeTest(tests.Test): class Routes(NodeRoutes): + def __init__(self, **kwargs): + NodeRoutes.__init__(self, 'node', **kwargs) + @route('PROBE1', acl=ACL.AUTH) def probe1(self, request): pass @@ -479,27 +424,26 @@ class NodeTest(tests.Test): }) def test_PackagesRoute(self): - node.files_root.value = '.' - self.touch(('packages/repo/arch/package', 'file')) volume = self.start_master() client = Connection(auth=http.SugarAuth(keyfile.value)) - self.assertEqual(['repo'], client.get(['packages'])) - self.assertEqual(['arch'], client.get(['packages', 'repo'])) + self.touch(('master/files/packages/repo/arch/package', 'file')) + volume.blobs.populate() + + self.assertEqual([], client.get(['packages'])) + self.assertEqual([], client.get(['packages', 'repo'])) self.assertEqual(['package'], client.get(['packages', 'repo', 'arch'])) self.assertEqual('file', client.get(['packages', 'repo', 'arch', 'package'])) def test_PackageUpdatesRoute(self): - node.files_root.value = '.' - self.touch( - ('packages/repo/1', '', 1), - ('packages/repo/1.1', '', 1), - ('packages/repo/2', '', 2), - ('packages/repo/2.2', '', 2), - ) volume = self.start_master() ipc = Connection(auth=http.SugarAuth(keyfile.value)) + self.touch('master/files/packages/repo/1', 'master/files/packages/repo/1.1') + volume.blobs.populate() + self.touch('master/files/packages/repo/2', 'master/files/packages/repo/2.2') + volume.blobs.populate() + self.assertEqual( sorted(['1', '2']), sorted(ipc.get(['packages', 'repo', 'updates']))) @@ -552,7 +496,7 @@ class NodeTest(tests.Test): self.assertEqual({ release: { - 'seqno': 4, + 'seqno': 6, 'author': {tests.UID: {'name': tests.UID, 'order': 0, 'role': 3}}, 'value': { 'license': ['Public Domain'], @@ -580,7 +524,7 @@ class NodeTest(tests.Test): def test_Solve(self): volume = self.start_master() - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) activity_file = json.load(conn.request('POST', ['context'], self.zips(('topdir/activity/activity.info', '\n'.join([ @@ -623,7 +567,7 @@ class NodeTest(tests.Test): def test_SolveWithArguments(self): volume = self.start_master() - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) activity_file = json.load(conn.request('POST', ['context'], self.zips(('topdir/activity/activity.info', '\n'.join([ @@ -683,7 +627,7 @@ class NodeTest(tests.Test): def test_Clone(self): volume = self.start_master() - conn = http.Connection(api_url.value, http.SugarAuth(keyfile.value)) + conn = http.Connection(api.value, http.SugarAuth(keyfile.value)) activity_info = '\n'.join([ '[Activity]', diff --git a/tests/units/node/slave.py b/tests/units/node/slave.py new file mode 100755 index 0000000..6e75602 --- /dev/null +++ b/tests/units/node/slave.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import json +import time +import hashlib +from os.path import exists + +from __init__ import tests + +from sugar_network import db, toolkit +from sugar_network.client import Connection, keyfile +from sugar_network.node import master_api +from sugar_network.node.master import MasterRoutes +from sugar_network.node.slave import SlaveRoutes +from sugar_network.db.volume import Volume +from sugar_network.model.user import User +from sugar_network.toolkit.router import Router, File +from sugar_network.toolkit import coroutine, http, parcel + + +class SlaveTest(tests.Test): + + def setUp(self): + tests.Test.setUp(self) + + class Document(db.Resource): + + @db.indexed_property(db.Localized, slot=1, prefix='N', full_text=True) + def title(self, value): + return value + + @db.indexed_property(db.Localized, prefix='D', full_text=True) + def message(self, value): + return value + + @db.stored_property(db.Blob) + def blob(self, value): + return value + + self.Document = Document + self.slave_volume = Volume('slave', [User, Document]) + self.slave_routes = SlaveRoutes(volume=self.slave_volume) + self.slave_server = coroutine.WSGIServer(('127.0.0.1', 8888), Router(self.slave_routes)) + coroutine.spawn(self.slave_server.serve_forever) + coroutine.dispatch() + + def test_online_sync_Push(self): + self.fork_master([User, self.Document]) + master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value)) + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + slave.post(cmd='online_sync') + self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges'))) + + guid1 = slave.post(['document'], {'message': '1', 'title': ''}) + guid2 = slave.post(['document'], {'message': '2', 'title': ''}) + + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'message': '1'}, + {'guid': guid2, 'message': '2'}, + ], + master.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[2, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges'))) + + guid3 = slave.post(['document'], {'message': '3', 'title': ''}) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'message': '1'}, + {'guid': guid2, 'message': '2'}, + {'guid': guid3, 'message': '3'}, + ], + master.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[3, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + slave.put(['document', guid2], {'message': '22'}) + slave.post(cmd='online_sync') + self.assertEqual('22', master.get(['document', guid2, 'message'])) + self.assertEqual([[4, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[7, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + slave.delete(['document', guid1]) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid2, 'message': '22'}, + {'guid': guid3, 'message': '3'}, + ], + master.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[5, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[8, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + slave.put(['document', guid1], {'message': 'a'}) + slave.put(['document', guid2], {'message': 'b'}) + slave.put(['document', guid3], {'message': 'c'}) + guid4 = slave.post(['document'], {'message': 'd', 'title': ''}) + slave.delete(['document', guid2]) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid3, 'message': 'c'}, + {'guid': guid4, 'message': 'd'}, + ], + master.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[13, None]], json.load(file('slave/var/push.ranges'))) + + def test_online_sync_Pull(self): + self.fork_master([User, self.Document]) + master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value)) + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + slave.post(cmd='online_sync') + self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges'))) + + guid1 = master.post(['document'], {'message': '1', 'title': ''}) + guid2 = master.post(['document'], {'message': '2', 'title': ''}) + + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'message': '1'}, + {'guid': guid2, 'message': '2'}, + ], + slave.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[5, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges'))) + + guid3 = master.post(['document'], {'message': '3', 'title': ''}) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'message': '1'}, + {'guid': guid2, 'message': '2'}, + {'guid': guid3, 'message': '3'}, + ], + slave.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[3, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + master.put(['document', guid2], {'message': '22'}) + slave.post(cmd='online_sync') + self.assertEqual('22', slave.get(['document', guid2, 'message'])) + self.assertEqual([[7, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + master.delete(['document', guid1]) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid2, 'message': '22'}, + {'guid': guid3, 'message': '3'}, + ], + slave.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[8, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges'))) + + coroutine.sleep(1) + master.put(['document', guid1], {'message': 'a'}) + master.put(['document', guid2], {'message': 'b'}) + master.put(['document', guid3], {'message': 'c'}) + guid4 = master.post(['document'], {'message': 'd', 'title': ''}) + master.delete(['document', guid2]) + slave.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid3, 'message': 'c'}, + {'guid': guid4, 'message': 'd'}, + ], + slave.get(['document'], reply=['guid', 'message'])['result']) + self.assertEqual([[13, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges'))) + + def test_online_sync_PullBlobs(self): + self.fork_master([User, self.Document]) + master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value)) + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + slave.post(cmd='online_sync') + self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges'))) + + guid = master.post(['document'], {'message': '1', 'title': ''}) + master.put(['document', guid, 'blob'], 'blob') + self.touch(('master/files/foo/bar', 'file')) + + slave.post(cmd='online_sync') + self.assertEqual('blob', slave.request('GET', ['document', guid, 'blob']).content) + self.assertEqual('file', file('slave/files/foo/bar').read()) + + def test_online_sync_PullFromPreviouslyMergedRecord(self): + self.fork_master([User, self.Document]) + master = Connection('http://127.0.0.1:7777', auth=http.SugarAuth(keyfile.value)) + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + slave.post(cmd='online_sync') + self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[1, None]], json.load(file('slave/var/push.ranges'))) + + guid = slave.post(['document'], {'message': '1', 'title': '1'}) + slave.post(cmd='online_sync') + + coroutine.sleep(1) + master.put(['document', guid], {'message': '1_'}) + slave.put(['document', guid], {'title': '1_'}) + slave.post(cmd='online_sync') + + self.assertEqual('1_', master.get(['document', guid, 'message'])) + self.assertEqual('1_', master.get(['document', guid, 'title'])) + self.assertEqual('1_', slave.get(['document', guid, 'message'])) + self.assertEqual('1_', slave.get(['document', guid, 'title'])) + + def test_offline_sync_Import(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + self.touch(('blob1', 'a')) + self.touch(('blob2', 'bb')) + parcel.encode_dir([ + ('push', {'from': '127.0.0.1:7777'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + File('./blob1', meta={'content-length': '1'}), + File('./blob2', meta={'content-length': '2', 'path': 'foo/bar'}), + {'commit': [[1, 2]]}, + ]), + ('ack', {'ack': [[101, 103]], 'ranges': [[1, 3]], 'from': '127.0.0.1:7777', 'to': self.slave_routes.guid}, []), + ], + root='sync', limit=99999999) + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + + self.assertEqual(1, slave.get(['document', '1', 'ctime'])) + self.assertEqual('a', file(self.slave_volume.blobs.get(hashlib.sha1('a').hexdigest()).path).read()) + self.assertEqual('bb', file(self.slave_volume.blobs.get('foo/bar').path).read()) + self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges'))) + self.assertEqual([[3, 100], [104, None]], json.load(file('slave/var/pull.ranges'))) + + self.assertEqual( + sorted([ + ({'from': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + {'content-length': '1'}, + {'content-length': '2', 'path': 'foo/bar'}, + {'commit': [[1, 2]]}, + ]), + ({'ack': [[101, 103]], 'from': '127.0.0.1:7777', 'packet': 'ack', 'ranges': [[1, 3]], 'to': self.slave_routes.guid}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[3, 100], [104, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + def test_offline_sync_ImportPush(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + self.touch(('blob1', 'a')) + self.touch(('blob2', 'bb')) + parcel.encode_dir([ + ('push', {'from': '127.0.0.1:7777'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + File('./blob1', meta={'content-length': '1'}), + File('./blob2', meta={'content-length': '2', 'path': 'foo/bar'}), + {'commit': [[1, 2]]}, + ]), + ], + root='sync', limit=99999999) + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + + self.assertEqual(1, slave.get(['document', '1', 'ctime'])) + self.assertEqual('a', file(self.slave_volume.blobs.get(hashlib.sha1('a').hexdigest()).path).read()) + self.assertEqual('bb', file(self.slave_volume.blobs.get('foo/bar').path).read()) + self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges'))) + self.assertEqual([[3, None]], json.load(file('slave/var/pull.ranges'))) + + self.assertEqual( + sorted([ + ({'from': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + {'content-length': '1'}, + {'content-length': '2', 'path': 'foo/bar'}, + {'commit': [[1, 2]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[3, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + def test_offline_sync_ImportAck(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + parcel.encode_dir([ + ('ack', {'ack': [[101, 103]], 'ranges': [[1, 3]], 'from': '127.0.0.1:7777', 'to': self.slave_routes.guid}, []), + ], + root='sync', limit=99999999) + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + + self.assertEqual([[4, None]], json.load(file('slave/var/push.ranges'))) + self.assertEqual([[1, 100], [104, None]], json.load(file('slave/var/pull.ranges'))) + + self.assertEqual( + sorted([ + ({'ack': [[101, 103]], 'from': '127.0.0.1:7777', 'packet': 'ack', 'ranges': [[1, 3]], 'to': self.slave_routes.guid}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, 100], [104, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + def test_offline_sync_GenerateRequestAfterImport(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + parcel.encode_dir([ + ('push', {'from': 'another-slave'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + {'commit': [[1, 1]]}, + ]), + ], + root='sync', limit=99999999) + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + + self.assertEqual(1, slave.get(['document', '1', 'ctime'])) + self.assertEqual([[2, None]], json.load(file('slave/var/push.ranges'))) + self.assertEqual([[1, None]], json.load(file('slave/var/pull.ranges'))) + + self.assertEqual( + sorted([ + ({'from': 'another-slave', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}, + 'title': {'value': {}, 'mtime': 0}, + 'message': {'value': {}, 'mtime': 0}, + }}, + {'commit': [[1, 1]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'request', 'to': '127.0.0.1:7777', 'origin': 'another-slave', 'ranges': [[1, 1]]}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'push', 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + def test_offline_sync_Export(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + class statvfs(object): + + f_bfree = None + f_frsize = 1 + + self.override(os, 'statvfs', lambda *args: statvfs()) + statvfs.f_bfree = 999999999 + self.override(time, 'time', lambda: 0) + + guid = slave.post(['document'], {'message': '', 'title': ''}) + push_seqno = self.slave_volume.seqno.value + 1 + self.slave_routes._push_r.value = [[push_seqno, None]] + slave.put(['document', guid, 'title'], 'probe') + self.slave_volume.blobs.post('a') + self.touch(('slave/files/foo/bar', 'bb')) + self.slave_volume.blobs.populate() + + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + + self.assertEqual( + sorted([ + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': 'probe'}, 'mtime': self.slave_volume['document'].get(guid).meta('title')['mtime']}, + }}, + {'resource': 'user'}, + {'content-length': '1', 'content-type': 'application/octet-stream'}, + {'commit': [[push_seqno, push_seqno + 1]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + def test_offline_sync_ContinuousExport(self): + slave = Connection('http://127.0.0.1:8888', auth=http.SugarAuth(keyfile.value)) + + class statvfs(object): + + f_bfree = None + f_frsize = 1 + + self.override(os, 'statvfs', lambda *args: statvfs()) + self.override(time, 'time', lambda: 0) + + guid1 = slave.post(['document'], {'message': '', 'title': ''}) + guid2 = slave.post(['document'], {'message': '', 'title': ''}) + push_seqno = self.slave_volume.seqno.value + 1 + self.slave_routes._push_r.value = [[push_seqno, None]] + + RECORD = 1024 * 1024 + slave.put(['document', guid1, 'title'], '.' * RECORD) + slave.put(['document', guid2, 'title'], '.' * RECORD) + statvfs.f_bfree = parcel._RESERVED_DISK_SPACE + RECORD * 1.5 + + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + self.assertEqual( + sorted([ + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid1, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']}, + }}, + {'commit': [[push_seqno, push_seqno]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + self.assertEqual( + sorted([ + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid1, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']}, + }}, + {'commit': [[push_seqno, push_seqno]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid2, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid2).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid2).meta('title')['mtime']}, + }}, + {'resource': 'user'}, + {'commit': [[push_seqno + 1, push_seqno + 1]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + slave.post(cmd='offline_sync', path=tests.tmpdir + '/sync') + self.assertEqual( + sorted([ + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid1, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid1).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid1).meta('title')['mtime']}, + }}, + {'commit': [[push_seqno, push_seqno]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'guid': guid2, 'patch': { + 'mtime': {'value': 0, 'mtime': self.slave_volume['document'].get(guid2).meta('mtime')['mtime']}, + 'title': {'value': {'en-us': '.' * RECORD}, 'mtime': self.slave_volume['document'].get(guid2).meta('title')['mtime']}, + }}, + {'resource': 'user'}, + {'commit': [[push_seqno + 1, push_seqno + 1]]}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ({'from': self.slave_routes.guid, 'to': '127.0.0.1:7777', 'packet': u'push'}, [ + {'resource': 'document'}, + {'resource': 'user'}, + ]), + ({'from': self.slave_routes.guid, 'packet': 'pull', 'ranges': [[1, None]], 'to': '127.0.0.1:7777'}, [ + ]), + ]), + sorted([(packet.header, [i for i in packet]) for packet in parcel.decode_dir('sync')])) + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/node/stats_user.py b/tests/units/node/stats_user.py deleted file mode 100755 index 49275b5..0000000 --- a/tests/units/node/stats_user.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import json -import time - -from __init__ import tests - -from sugar_network.toolkit.rrd import Rrd -from sugar_network.node.stats_user import stats_user_step, stats_user_rras, diff, merge, commit - - -class StatsTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - stats_user_step.value = 1 - stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100'] - - def test_diff(self): - ts = int(time.time()) - - rrd = Rrd('stats/user/dir1/user1', stats_user_step.value, stats_user_rras.value) - rrd['db1'].put({'a': 1}, ts) - rrd['db1'].put({'a': 2}, ts + 1) - - rrd = Rrd('stats/user/dir1/user2', stats_user_step.value, stats_user_rras.value) - rrd['db2'].put({'b': 3}, ts) - - rrd = Rrd('stats/user/dir2/user3', stats_user_step.value, stats_user_rras.value) - rrd['db3'].put({'c': 4}, ts) - rrd['db4'].put({'d': 5}, ts) - - self.assertEqual([ - {'db': 'db3', 'user': 'user3'}, - {'timestamp': ts, 'values': {'c': 4.0}}, - {'db': 'db4', 'user': 'user3'}, - {'timestamp': ts, 'values': {'d': 5.0}}, - {'db': 'db2', 'user': 'user2'}, - {'timestamp': ts, 'values': {'b': 3.0}}, - {'db': 'db1', 'user': 'user1'}, - {'timestamp': ts, 'values': {'a': 1.0}}, - {'timestamp': ts + 1, 'values': {'a': 2.0}}, - {'commit': { - 'user1': { - 'db1': [[1, ts + 1]], - }, - 'user2': { - 'db2': [[1, ts]], - }, - 'user3': { - 'db3': [[1, ts]], - 'db4': [[1, ts]], - }, - }}, - ], - [i for i in diff()]) - - def test_merge(self): - ts = int(time.time()) - - self.assertEqual( - 'info', - merge([ - {'db': 'db3', 'user': 'user3'}, - {'timestamp': ts, 'values': {'c': 4.0}}, - {'db': 'db4', 'user': 'user3'}, - {'timestamp': ts, 'values': {'d': 5.0}}, - {'db': 'db2', 'user': 'user2'}, - {'timestamp': ts, 'values': {'b': 3.0}}, - {'db': 'db1', 'user': 'user1'}, - {'timestamp': ts, 'values': {'a': 1.0}}, - {'timestamp': ts + 1, 'values': {'a': 2.0}}, - {'commit': 'info'}, - ])) - - self.assertEqual([ - [('db1', ts, {'a': 1.0}), ('db1', ts + 1, {'a': 2.0})], - ], - [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user1', 1)]) - - self.assertEqual([ - [('db2', ts, {'b': 3.0})], - ], - [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user2', 1)]) - - self.assertEqual([ - [('db3', ts, {'c': 4.0})], - [('db4', ts, {'d': 5.0})], - ], - [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user3', 1)]) - - def test_commit(self): - ts = int(time.time()) - commit({ - 'user1': { - 'db1': [[1, ts + 1]], - }, - 'user2': { - 'db2': [[1, ts]], - }, - 'user3': { - 'db3': [[1, ts]], - 'db4': [[1, ts]], - }, - }) - - self.assertEqual( - [[ts + 2, None]], - json.load(file('stats/user/us/user1/db1.push'))) - self.assertEqual( - [[ts + 1, None]], - json.load(file('stats/user/us/user2/db2.push'))) - self.assertEqual( - [[ts + 1, None]], - json.load(file('stats/user/us/user3/db3.push'))) - self.assertEqual( - [[ts + 1, None]], - json.load(file('stats/user/us/user3/db4.push'))) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/node/sync_master.py b/tests/units/node/sync_master.py deleted file mode 100755 index c946396..0000000 --- a/tests/units/node/sync_master.py +++ /dev/null @@ -1,668 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import gzip -import time -import json -import base64 -import hashlib -from glob import glob -from os.path import join, exists -from StringIO import StringIO - -import rrdtool - -from __init__ import tests - -from sugar_network.db.directory import Directory -from sugar_network import db, node, toolkit -from sugar_network.node import sync -from sugar_network.node.master import MasterRoutes -from sugar_network.db.volume import Volume -from sugar_network.toolkit import coroutine -from sugar_network.toolkit.rrd import Rrd -from sugar_network.toolkit.router import Response - - -class statvfs(object): - - f_bfree = None - f_frsize = 1 - - -class SyncMasterTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - - self.uuid = 0 - self.override(toolkit, 'uuid', self.next_uuid) - self.override(os, 'statvfs', lambda *args: statvfs()) - statvfs.f_bfree = 999999999 - - class Document(db.Resource): - - @db.indexed_property(slot=1, default='') - def prop(self, value): - return value - - node.files_root.value = 'sync' - self.volume = Volume('master', [Document]) - self.master = MasterRoutes('127.0.0.1:8888', self.volume) - - def next_uuid(self): - self.uuid += 1 - return str(self.uuid) - - def test_sync_ExcludeRecentlyMergedDiffFromPull(self): - request = Request() - for chunk in sync.encode([ - ('diff', None, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1}, - 'ctime': {'value': 1, 'mtime': 1}, - 'mtime': {'value': 1, 'mtime': 1}, - 'prop': {'value': 'value', 'mtime': 1}, - }}, - {'commit': [[1, 1]]}, - ]), - ('pull', {'sequence': [[1, None]]}, None), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = StringIO() - for chunk in self.master.sync(request): - response.write(chunk) - response.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': None}, []), - ({'packet': 'diff', 'src': '127.0.0.1:8888'}, [{'resource': 'document'}, {'commit': []}]), - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(response)]) - - request = Request() - for chunk in sync.encode([ - ('pull', {'sequence': [[1, None]]}, None), - ('diff', None, [ - {'resource': 'document'}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 2}, - 'ctime': {'value': 2, 'mtime': 2}, - 'mtime': {'value': 2, 'mtime': 2}, - 'prop': {'value': 'value', 'mtime': 2}, - }}, - {'commit': [[2, 2]]}, - ]), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = StringIO() - for chunk in self.master.sync(request): - response.write(chunk) - response.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[2, 2]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': None}, []), - ({'packet': 'diff', 'src': '127.0.0.1:8888'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1}, - 'ctime': {'value': 1, 'mtime': 1}, - 'mtime': {'value': 1, 'mtime': 1}, - 'prop': {'value': 'value', 'mtime': 1}, - }}, - {'commit': [[1, 1]]}, - ]), - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(response)]) - - def test_sync_MisaddressedPackets(self): - request = Request() - for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)]): - request.content_stream.write(chunk) - request.content_stream.seek(0) - self.assertRaises(RuntimeError, lambda: next(self.master.sync(request))) - - request = Request() - for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)], dst='fake'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - self.assertRaises(RuntimeError, lambda: next(self.master.sync(request))) - - request = Request() - for chunk in sync.encode([('pull', {'sequence': [[1, None]]}, None)], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - next(self.master.sync(request)) - - def test_push_WithoutCookies(self): - ts = int(time.time()) - - request = Request() - for chunk in sync.package_encode([ - ('diff', None, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1}, - 'ctime': {'value': 1, 'mtime': 1}, - 'mtime': {'value': 1, 'mtime': 1}, - 'prop': {'value': 'value', 'mtime': 1}, - }}, - {'commit': [[1, 1]]}, - ]), - ('stats_diff', {'dst': '127.0.0.1:8888'}, [ - {'db': 'db', 'user': 'user'}, - {'timestamp': ts, 'values': {'field': 1.0}}, - {'commit': {'user': {'db': [[1, ts]]}}}, - ]), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': None, 'filename': '2.sneakernet'}, []), - ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[1, ts]]}}, 'src': '127.0.0.1:8888', 'dst': None, 'filename': '2.sneakernet'}, []), - ], - [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - - def test_push_WithCookies(self): - ts = int(time.time()) - - request = Request() - for chunk in sync.package_encode([ - ('pull', {'sequence': [[1, None]]}, None), - ('files_pull', {'sequence': [[1, None]]}, None), - ('diff', None, [ - {'resource': 'document'}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 2}, - 'ctime': {'value': 2, 'mtime': 2}, - 'mtime': {'value': 2, 'mtime': 2}, - 'prop': {'value': 'value', 'mtime': 2}, - }}, - {'commit': [[2, 2]]}, - ]), - ('stats_diff', {'dst': '127.0.0.1:8888'}, [ - {'db': 'db', 'user': 'user'}, - {'timestamp': ts + 1, 'values': {'field': 2.0}}, - {'commit': {'user': {'db': [[2, ts]]}}}, - ]), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': None, 'filename': '2.sneakernet'}, []), - ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[2, ts]]}}, 'src': '127.0.0.1:8888', 'dst': None, 'filename': '2.sneakernet'}, []), - ], - [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({None: [[1, 1]]})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - def test_push_CollectCookies(self): - request = Request() - request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \ - base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])) - for chunk in sync.package_encode([ - ('pull', {'sequence': [[11, None]]}, None), - ('pull', {'sequence': [[1, 2]]}, None), - ('files_pull', {'sequence': [[11, None]]}, None), - ('files_pull', {'sequence': [[3, 4]]}, None), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, 2], [10, None]]), ('files_pull', None, [[3, 4], [10, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({None: []})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - request = Request() - request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \ - base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])) - for chunk in sync.package_encode([ - ('pull', {'sequence': [[1, 5]]}, None), - ('files_pull', {'sequence': [[1, 5]]}, None), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, 5], [10, None]]), ('files_pull', None, [[1, 5], [10, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({None: []})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - def test_push_DoNotExcludeAcksFromCookies(self): - ts = int(time.time()) - - request = Request() - for chunk in sync.package_encode([ - ('pull', {'sequence': [[1, None]]}, None), - ('diff', None, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1}, - 'ctime': {'value': 1, 'mtime': 1}, - 'mtime': {'value': 1, 'mtime': 1}, - 'prop': {'value': 'value', 'mtime': 1}, - }}, - {'commit': [[10, 10]]}, - ]), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[10, 10]], 'dst': None, 'filename': '2.sneakernet'}, []), - ], - [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({None: [[1, 1]]})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - def test_pull(self): - self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}) - self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2}) - self.utime('master', 0) - self.touch(('sync/1', 'file1')) - self.touch(('sync/2', 'file2')) - - request = Request() - request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])) - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - coroutine.sleep(.5) - - request = Request() - request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0] - response = Response() - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'diff'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'prop': {'value': '1', 'mtime': 0}, - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}}, - }, - {'guid': '2', 'diff': { - 'prop': {'value': '2', 'mtime': 0}, - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}}, - }, - {'commit': [[1, 2]]}, - ]) - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - request = Request() - request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0] - response = Response() - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - packets_iter = sync.decode(gzip.GzipFile(mode='r', fileobj=reply)) - with next(packets_iter) as packet: - self.assertEqual('files_diff', packet.name) - records_iter = iter(packet) - self.assertEqual('file1', next(records_iter)['blob'].read()) - self.assertEqual('file2', next(records_iter)['blob'].read()) - self.assertEqual({'op': 'commit', 'sequence': [[1, 4]]}, next(records_iter)) - self.assertRaises(StopIteration, records_iter.next) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - - def test_pull_EmptyPackets(self): - self.master._pulls = { - 'pull': lambda layer, in_seq, out_seq=None, exclude_seq=None, **kwargs: \ - ('diff', None, [{'layer': layer, 'seq': in_seq}]), - } - - request = Request() - request.environ['HTTP_COOKIE'] = 'sugar_network_pull=%s' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]])])) - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - coroutine.sleep(.5) - self.assertEqual(1, len([i for i in glob('tmp/pulls/*.tag')])) - - request = Request() - request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0] - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - self.assertEqual(0, len([i for i in glob('tmp/pulls/*.tag')])) - - def test_pull_FullClone(self): - - def diff(layer, seq, out_seq): - out_seq.include(1, 10) - yield {'layer': layer, 'seq': seq} - - self.master._pulls = { - 'pull': lambda layer, in_seq, out_seq, exclude_seq=None, **kwargs: ('diff', None, diff(layer, in_seq, out_seq)), - 'files_pull': lambda layer, in_seq, out_seq, exclude_seq=None, **kwargs: ('files_diff', None, diff(layer, in_seq, out_seq)), - } - - request = Request() - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - coroutine.sleep(.5) - - request = Request() - request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0] - response = Response() - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'diff'}, [{'layer': None, 'seq': [[1, None]]}]), - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - request = Request() - request.environ['HTTP_COOKIE'] = response.get('set-cookie')[0] - response = Response() - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'files_diff'}, [{'layer': None, 'seq': [[1, None]]}]), - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - - def test_push_SetSentCookies(self): - request = Request() - for chunk in sync.package_encode([ - ('pull', {'src': '1', 'sequence': [[1, None]], 'layer': '1'}, None), - ('pull', {'src': '1', 'sequence': [[11, None]], 'layer': '1'}, None), - ('pull', {'src': '2', 'sequence': [[2, None]], 'layer': '2'}, None), - ('pull', {'src': '2', 'sequence': [[22, None]], 'layer': '2'}, None), - ('diff', {'src': '3'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 1}, - 'ctime': {'value': 1, 'mtime': 1}, - 'mtime': {'value': 1, 'mtime': 1}, - 'prop': {'value': 'value', 'mtime': 1}, - }}, - {'commit': [[1, 1]]}, - ]), - ('diff', {'src': '3'}, [ - {'resource': 'document'}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 2}, - 'ctime': {'value': 2, 'mtime': 2}, - 'mtime': {'value': 2, 'mtime': 2}, - 'prop': {'value': 'value', 'mtime': 2}, - }}, - {'commit': [[2, 2]]}, - ]), - ], dst='127.0.0.1:8888'): - request.content_stream.write(chunk) - request.content_stream.seek(0) - - response = Response() - reply = StringIO() - for chunk in self.master.push(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'ack', 'ack': [[1, 1]], 'src': '127.0.0.1:8888', 'sequence': [[1, 1]], 'dst': '3', 'filename': '2.sneakernet'}, []), - ({'packet': 'ack', 'ack': [[2, 2]], 'src': '127.0.0.1:8888', 'sequence': [[2, 2]], 'dst': '3', 'filename': '2.sneakernet'}, []), - ], - [(packet.props, [i for i in packet]) for packet in sync.package_decode(reply)]) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', '1', [[1, None]]), ('pull', '2', [[2, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({'1': [], '2': [], '3': [[1, 2]]})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - - def test_pull_ExcludeSentCookies(self): - self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}) - self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2}) - self.utime('master', 0) - - request = Request() - request.environ['HTTP_COOKIE'] = { - 'sugar_network_pull': base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent': base64.b64encode(json.dumps({'slave': [[2, 2]]})), - } - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({'slave': [[2, 2]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - coroutine.sleep(.5) - - request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(response.get('set-cookie')) - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'diff'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'prop': {'value': '1', 'mtime': 0}, - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}}, - }, - {'commit': [[1, 1]]}, - ]) - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - - def test_pull_DoNotExcludeSentCookiesForMultipleNodes(self): - self.volume['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}) - self.volume['document'].create({'guid': '2', 'prop': '2', 'ctime': 2, 'mtime': 2}) - self.utime('master', 0) - - request = Request() - request.environ['HTTP_COOKIE'] = { - 'sugar_network_pull': base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent': base64.b64encode(json.dumps({'slave': [[2, 2]], 'other': []})), - } - response = Response() - self.assertEqual(None, self.master.pull(request, response)) - self.assertEqual([ - 'sugar_network_pull=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps([('pull', None, [[1, None]])])), - 'sugar_network_sent=%s; Max-Age=3600; HttpOnly' % \ - base64.b64encode(json.dumps({'slave': [[2, 2]], 'other': []})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ], - response.get('set-cookie')) - coroutine.sleep(.5) - - request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(response.get('set-cookie')) - reply = StringIO() - for chunk in self.master.pull(request, response): - reply.write(chunk) - reply.seek(0) - self.assertEqual([ - ({'packet': 'diff'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'prop': {'value': '1', 'mtime': 0}, - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}}, - }, - {'guid': '2', 'diff': { - 'prop': {'value': '2', 'mtime': 0}, - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}}, - }, - {'commit': [[1, 2]]}, - ]) - ], - [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) - self.assertEqual([ - 'sugar_network_pull=unset_sugar_network_pull; Max-Age=0; HttpOnly', - 'sugar_network_sent=unset_sugar_network_sent; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('set-cookie')) - - def __test_pull_LimittedPull(self): - pass - - def __test_pull_ReusePullSeqFromCookies(self): - pass - - def __test_pull_AskForNotYetReadyPull(self): - pass - - def __test_pull_ProcessFilePulls(self): - pass - - def __test_ReuseCachedPulls(self): - pass - - -class Request(object): - - def __init__(self): - self.content_stream = StringIO() - self.environ = {} - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/node/sync_offline.py b/tests/units/node/sync_offline.py deleted file mode 100755 index 1c4ffd5..0000000 --- a/tests/units/node/sync_offline.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import time -import json -import uuid -from os.path import exists, join - -import rrdtool - -from __init__ import tests - -from sugar_network import db, node, toolkit -from sugar_network.toolkit.rrd import Rrd -from sugar_network.client import api_url -from sugar_network.node import sync, stats_user, files_root -from sugar_network.node.slave import SlaveRoutes -from sugar_network.db import Volume -from sugar_network.toolkit import coroutine - - -class statvfs(object): - - f_bfree = None - f_frsize = 1 - - -class SyncOfflineTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - self.uuid = 0 - self.override(toolkit, 'uuid', self.next_uuid) - self.override(os, 'statvfs', lambda *args: statvfs()) - statvfs.f_bfree = 999999999 - stats_user.stats_user_step.value = 1 - stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100'] - node.sync_layers.value = 'pilot' - - def next_uuid(self): - self.uuid += 1 - return str(self.uuid) - - def test_FailOnFullDump(self): - - class Document(db.Resource): - pass - - volume = Volume('node', [Document]) - cp = SlaveRoutes('node/key', volume) - - node.sync_layers.value = None - self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt') - node.sync_layers.value = 'public' - cp.offline_sync(tests.tmpdir + '/mnt') - - def test_Export(self): - - class Document(db.Resource): - pass - - volume = Volume('node', [Document]) - cp = SlaveRoutes('node/key', volume) - stats_user.stats_user.value = True - - volume['document'].create({'guid': '1', 'prop': 'value1', 'ctime': 1, 'mtime': 1}) - volume['document'].create({'guid': '2', 'prop': 'value2', 'ctime': 2, 'mtime': 2}) - self.utime('node', 0) - - ts = int(time.time()) - rrd = Rrd('stats/user/dir/user', stats_user.stats_user_step.value, stats_user.stats_user_rras.value) - rrd['db'].put({'field': 1}, ts) - - cp.offline_sync(tests.tmpdir + '/mnt') - assert cp._offline_session is None - - self.assertEqual([ - ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}, - }}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}, - }}, - {'commit': [[1, 2]]}, - ]), - ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [ - {'db': 'db', 'user': 'user'}, - {'timestamp': ts, 'values': {'field': 1.0}}, - {'commit': {'user': {'db': [[1, ts]]}}}, - ]), - ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet'}, []), - ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet', 'layer': ['pilot']}, []), - ], - sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('mnt')])) - assert not exists('node/pull.sequence') - assert not exists('node/push.sequence') - - def test_ContinuesExport(self): - payload = ''.join([str(uuid.uuid4()) for i in xrange(5000)]) - - class Document(db.Resource): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = Volume('node', [Document]) - cp = SlaveRoutes('node/key', volume) - stats_user.stats_user.value = True - - volume['document'].create({'guid': '1', 'prop': payload, 'ctime': 1, 'mtime': 1}) - volume['document'].create({'guid': '2', 'prop': payload, 'ctime': 2, 'mtime': 2}) - self.utime('node', 0) - - ts = int(time.time()) - rrd = Rrd('stats/user/dir/user', stats_user.stats_user_step.value, stats_user.stats_user_rras.value) - rrd['db'].put({'field': 1}, ts) - - statvfs.f_bfree = len(payload) * 1.5 + sync._SNEAKERNET_RESERVED_SIZE - cp.offline_sync(tests.tmpdir + '/1') - assert cp._offline_session is not None - - self.assertEqual([ - ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '2.sneakernet'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}, - 'prop': {'value': payload, 'mtime': 0}, - }}, - {'commit': [[1, 1]]}, - ]), - ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet'}, []), - ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'sequence': [[1, None]], 'filename': '2.sneakernet', 'layer': ['pilot']}, []), - ], - sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('1')])) - - statvfs.f_bfree = 999999999 - cp.offline_sync(tests.tmpdir + '/2') - assert cp._offline_session is None - - self.assertEqual([ - ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '3.sneakernet'}, [ - {'resource': 'document'}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}, - 'prop': {'value': payload, 'mtime': 0}, - }}, - {'commit': [[2, 2]]}, - ]), - ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '1', 'filename': '3.sneakernet'}, [ - {'db': 'db', 'user': 'user'}, - {'timestamp': ts, 'values': {'field': 1.0}}, - {'commit': {'user': {'db': [[1, ts]]}}}, - ]), - ], - sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('2')])) - - statvfs.f_bfree = 999999999 - cp.offline_sync(tests.tmpdir + '/3') - assert cp._offline_session is None - - self.assertEqual([ - ({'packet': 'diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'filename': '5.sneakernet'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}, - 'prop': {'value': payload, 'mtime': 0}, - }}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}, - 'prop': {'value': payload, 'mtime': 0}, - }}, - {'commit': [[1, 2]]}, - ]), - ({'packet': 'stats_diff', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'filename': '5.sneakernet'}, [ - {'db': 'db', 'user': 'user'}, - {'timestamp': ts, 'values': {'field': 1.0}}, - {'commit': {'user': {'db': [[1, ts]]}}}, - ]), - ({'packet': 'files_pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'sequence': [[1, None]], 'filename': '5.sneakernet'}, []), - ({'packet': 'pull', 'src': cp.guid, 'dst': '127.0.0.1:8888', 'api_url': 'http://127.0.0.1:8888', 'session': '4', 'sequence': [[1, None]], 'filename': '5.sneakernet', 'layer': ['pilot']}, []), - ], - sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('3')])) - - def test_Import(self): - class Document(db.Resource): - pass - - volume = Volume('node', [Document]) - cp = SlaveRoutes('node/key', volume) - stats_user.stats_user.value = True - files_root.value = 'files' - - ts = int(time.time()) - sync.sneakernet_encode([ - ('diff', {'src': '127.0.0.1:8888'}, [ - {'resource': 'document'}, - {'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': 0}, - 'ctime': {'value': 1, 'mtime': 0}, - 'mtime': {'value': 1, 'mtime': 0}, - }}, - {'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': 0}, - 'ctime': {'value': 2, 'mtime': 0}, - 'mtime': {'value': 2, 'mtime': 0}, - }}, - {'commit': [[1, 2]]}, - ]), - ('files_diff', {'src': '127.0.0.1:8888'}, [ - {'op': 'update', 'blob_size': 1, 'blob': ['a'], 'path': '1'}, - {'op': 'update', 'blob_size': 2, 'blob': ['bb'], 'path': '2'}, - {'op': 'commit', 'sequence': [[1, 2]]}, - ]), - ('ack', {'ack': [[101, 103]], 'sequence': [[1, 3]], 'src': '127.0.0.1:8888', 'dst': cp.guid}, []), - ('stats_ack', {'sequence': {'user': {'db': [[1, ts]]}}, 'src': '127.0.0.1:8888', 'dst': cp.guid}, []), - ], - root='mnt') - - cp.offline_sync(tests.tmpdir + '/mnt') - assert cp._offline_session is None - - self.assertEqual( - ['1', '2'], - [i.guid for i in volume['document'].find()[0]]) - self.assertEqual('a', file('files/1').read()) - self.assertEqual('bb', file('files/2').read()) - self.assertEqual([[4, None]], json.load(file('node/push.sequence'))) - self.assertEqual([[3, 100], [104, None]], json.load(file('node/pull.sequence'))) - self.assertEqual([[3, None]], json.load(file('node/files.sequence'))) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/node/sync_online.py b/tests/units/node/sync_online.py deleted file mode 100755 index e2c864a..0000000 --- a/tests/units/node/sync_online.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import json -from os.path import exists - -from __init__ import tests - -from sugar_network import db, toolkit -from sugar_network.client import Connection, api_url, keyfile -from sugar_network.node import sync, stats_user, files_root -from sugar_network.node.master import MasterRoutes -from sugar_network.node.slave import SlaveRoutes -from sugar_network.db.volume import Volume -from sugar_network.model.user import User -from sugar_network.toolkit.router import Router -from sugar_network.toolkit import coroutine, http - - -class SyncOnlineTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - - self.stats_commit = [] - self.stats_merge = [] - def stats_diff(): - yield {'stats': 'probe'} - self.override(stats_user, 'diff', stats_diff) - def stats_merge(packet): - self.stats_merge.extend([i for i in packet]) - return 'ok' - self.override(stats_user, 'merge', stats_merge) - self.override(stats_user, 'commit', lambda seq: self.stats_commit.append(seq)) - - class Document(db.Resource): - - @db.indexed_property(prefix='C') - def context(self, value): - return value - - @db.indexed_property(prefix='T') - def type(self, value): - return value - - @db.indexed_property(db.Localized, slot=1, prefix='N', full_text=True) - def title(self, value): - return value - - @db.indexed_property(db.Localized, prefix='D', full_text=True) - def message(self, value): - return value - - api_url.value = 'http://127.0.0.1:9000' - - files_root.value = 'master/files' - self.master_volume = Volume('master', [User, Document]) - self.master_server = coroutine.WSGIServer(('127.0.0.1', 9000), Router(MasterRoutes('127.0.0.1:9000', self.master_volume))) - coroutine.spawn(self.master_server.serve_forever) - coroutine.dispatch() - - files_root.value = 'slave/files' - self.slave_volume = Volume('slave', [User, Document]) - self.slave_server = coroutine.WSGIServer(('127.0.0.1', 9001), Router(SlaveRoutes('slave/node', self.slave_volume))) - coroutine.spawn(self.slave_server.serve_forever) - coroutine.dispatch() - - def tearDown(self): - self.master_server.stop() - self.slave_server.stop() - tests.Test.tearDown(self) - - def test_Push(self): - client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value)) - - # Sync users - client.get(cmd='logon') - client.post(cmd='online-sync') - self.assertEqual([[4, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - guid1 = client.post(['document'], {'context': '', 'message': '1', 'title': '', 'type': 'post'}) - guid2 = client.post(['document'], {'context': '', 'message': '2', 'title': '', 'type': 'post'}) - - client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}}, - {'guid': guid2, 'message': {'en-us': '2'}}, - ], - [i.properties(['guid', 'message']) for i in self.master_volume['document'].find()[0]]) - self.assertEqual([[6, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[4, None]], json.load(file('slave/push.sequence'))) - - guid3 = client.post(['document'], {'context': '', 'message': '3', 'title': '', 'type': 'post'}) - client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}}, - {'guid': guid2, 'message': {'en-us': '2'}}, - {'guid': guid3, 'message': {'en-us': '3'}}, - ], - [i.properties(['guid', 'message']) for i in self.master_volume['document'].find()[0]]) - self.assertEqual([[7, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[5, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.put(['document', guid2], {'message': '22'}) - client.post(cmd='online-sync') - self.assertEqual( - {'guid': guid2, 'message': {'en-us': '22'}}, - self.master_volume['document'].get(guid2).properties(['guid', 'message'])) - self.assertEqual([[8, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[6, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.delete(['document', guid1]) - client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}, 'layer': ['deleted']}, - {'guid': guid2, 'message': {'en-us': '22'}, 'layer': []}, - {'guid': guid3, 'message': {'en-us': '3'}, 'layer': []}, - ], - [i.properties(['guid', 'message', 'layer']) for i in self.master_volume['document'].find()[0]]) - self.assertEqual([[9, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[7, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.put(['document', guid1], {'message': 'a'}) - client.put(['document', guid2], {'message': 'b'}) - client.put(['document', guid3], {'message': 'c'}) - guid4 = client.post(['document'], {'context': '', 'message': 'd', 'title': '', 'type': 'post'}) - client.delete(['document', guid2]) - client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': 'a'}, 'layer': ['deleted']}, - {'guid': guid2, 'message': {'en-us': 'b'}, 'layer': ['deleted']}, - {'guid': guid3, 'message': {'en-us': 'c'}, 'layer': []}, - {'guid': guid4, 'message': {'en-us': 'd'}, 'layer': []}, - ], - [i.properties(['guid', 'message', 'layer']) for i in self.master_volume['document'].find()[0]]) - self.assertEqual([[13, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[12, None]], json.load(file('slave/push.sequence'))) - - def test_PushStats(self): - stats_user.stats_user.value = True - client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value)) - client.post(cmd='online-sync') - self.assertEqual(['ok'], self.stats_commit) - self.assertEqual([{'stats': 'probe'}], self.stats_merge) - - def test_Pull(self): - client = Connection('http://127.0.0.1:9000', auth=http.SugarAuth(keyfile.value)) - slave_client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value)) - - # Sync users - slave_client.get(cmd='logon') - slave_client.post(cmd='online-sync') - self.assertEqual([[4, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - guid1 = client.post(['document'], {'context': '', 'message': '1', 'title': '', 'type': 'post'}) - guid2 = client.post(['document'], {'context': '', 'message': '2', 'title': '', 'type': 'post'}) - - slave_client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}}, - {'guid': guid2, 'message': {'en-us': '2'}}, - ], - [i.properties(['guid', 'message']) for i in self.slave_volume['document'].find()[0]]) - self.assertEqual([[6, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - guid3 = client.post(['document'], {'context': '', 'message': '3', 'title': '', 'type': 'post'}) - slave_client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}}, - {'guid': guid2, 'message': {'en-us': '2'}}, - {'guid': guid3, 'message': {'en-us': '3'}}, - ], - [i.properties(['guid', 'message']) for i in self.slave_volume['document'].find()[0]]) - self.assertEqual([[7, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.put(['document', guid2], {'message': '22'}) - slave_client.post(cmd='online-sync') - self.assertEqual( - {'guid': guid2, 'message': {'en-us': '22'}}, - self.slave_volume['document'].get(guid2).properties(['guid', 'message'])) - self.assertEqual([[8, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.delete(['document', guid1]) - slave_client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': '1'}, 'layer': ['deleted']}, - {'guid': guid2, 'message': {'en-us': '22'}, 'layer': []}, - {'guid': guid3, 'message': {'en-us': '3'}, 'layer': []}, - ], - [i.properties(['guid', 'message', 'layer']) for i in self.slave_volume['document'].find()[0]]) - self.assertEqual([[9, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - coroutine.sleep(1) - client.put(['document', guid1], {'message': 'a'}) - client.put(['document', guid2], {'message': 'b'}) - client.put(['document', guid3], {'message': 'c'}) - guid4 = client.post(['document'], {'context': '', 'message': 'd', 'title': '', 'type': 'post'}) - client.delete(['document', guid2]) - slave_client.post(cmd='online-sync') - self.assertEqual([ - {'guid': guid1, 'message': {'en-us': 'a'}, 'layer': ['deleted']}, - {'guid': guid2, 'message': {'en-us': 'b'}, 'layer': ['deleted']}, - {'guid': guid3, 'message': {'en-us': 'c'}, 'layer': []}, - {'guid': guid4, 'message': {'en-us': 'd'}, 'layer': []}, - ], - [i.properties(['guid', 'message', 'layer']) for i in self.slave_volume['document'].find()[0]]) - self.assertEqual([[14, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - def test_PullFiles(self): - self.touch(('master/files/1', 'a', 1)) - self.touch(('master/files/2/2', 'bb', 2)) - self.touch(('master/files/3/3/3', 'ccc', 3)) - os.utime('master/files', (1, 1)) - - client = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value)) - client.post(cmd='online-sync') - - files, stamp = json.load(file('master/files.index')) - self.assertEqual(1, stamp) - self.assertEqual(sorted([ - [2, '1', 1], - [3, '2/2', 2], - [4, '3/3/3', 3], - ]), - sorted(files)) - - self.assertEqual([[5, None]], json.load(file('slave/files.sequence'))) - self.assertEqual('a', file('slave/files/1').read()) - self.assertEqual('bb', file('slave/files/2/2').read()) - self.assertEqual('ccc', file('slave/files/3/3/3').read()) - - def test_PullFromPreviouslyMergedRecord(self): - master = Connection('http://127.0.0.1:9000', auth=http.SugarAuth(keyfile.value)) - slave = Connection('http://127.0.0.1:9001', auth=http.SugarAuth(keyfile.value)) - - # Sync users - slave.get(cmd='logon') - slave.post(cmd='online-sync') - self.assertEqual([[4, None]], json.load(file('slave/pull.sequence'))) - self.assertEqual([[2, None]], json.load(file('slave/push.sequence'))) - - guid = slave.post(['document'], {'context': '', 'message': '1', 'title': '1', 'type': 'post'}) - slave.post(cmd='online-sync') - - coroutine.sleep(1) - master.put(['document', guid], {'message': '1_'}) - slave.put(['document', guid], {'title': '1_'}) - slave.post(cmd='online-sync') - - self.assertEqual( - {'message': {'en-us': '1_'}, 'title': {'en-us': '1_'}}, - self.master_volume['document'].get(guid).properties(['message', 'title'])) - self.assertEqual( - {'message': {'en-us': '1_'}, 'title': {'en-us': '1_'}}, - self.slave_volume['document'].get(guid).properties(['message', 'title'])) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py index 89702a3..ee726bd 100644 --- a/tests/units/toolkit/__main__.py +++ b/tests/units/toolkit/__main__.py @@ -6,7 +6,6 @@ from coroutine import * from http import * from lsb_release import * from mountpoints import * -from rrd import * from toolkit import * from options import * from spec import * diff --git a/tests/units/toolkit/rrd.py b/tests/units/toolkit/rrd.py deleted file mode 100755 index 3434634..0000000 --- a/tests/units/toolkit/rrd.py +++ /dev/null @@ -1,291 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import time - -import rrdtool - -from __init__ import tests - -from sugar_network.toolkit import rrd - - -class RrdTest(tests.Test): - - def setUp(self): - tests.Test.setUp(self) - rrd._FETCH_PAGE = 100 - rrd._rrdtool = rrdtool - - def test_Db(self): - ts = int(time.time()) + 100 - - rrdtool.create('test.rrd', - '--start', str(ts), - '-s', '1', - 'DS:f1:GAUGE:1:2:3', - 'DS:f2:COUNTER:4:5:6', - 'RRA:AVERAGE:0.1:7:8', - 'RRA:LAST:0.2:9:10', - ) - - db = rrd._Db('test.rrd') - self.assertEqual(1, db.step) - self.assertEqual(ts, db.last) - self.assertEqual(2, len(db.fields)) - self.assertEqual(2, len(db.rras)) - - self.assertEqual('f1', db.fields[0]['name']) - self.assertEqual('GAUGE', db.fields[0]['type']) - self.assertEqual(1, db.fields[0]['minimal_heartbeat']) - self.assertEqual(2, db.fields[0]['min']) - self.assertEqual(3, db.fields[0]['max']) - - self.assertEqual('f2', db.fields[1]['name']) - self.assertEqual('COUNTER', db.fields[1]['type']) - self.assertEqual(4, db.fields[1]['minimal_heartbeat']) - self.assertEqual(5, db.fields[1]['min']) - self.assertEqual(6, db.fields[1]['max']) - - self.assertEqual('RRA:AVERAGE:0.1:7:8', db.rras[0]) - self.assertEqual('RRA:LAST:0.2:9:10', db.rras[1]) - - def test_DbSet_load(self): - rrdtool.create('1.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8') - rrdtool.create('2.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8') - rrdtool.create('3.rrd', 'DS:f:GAUGE:1:2:3', 'RRA:AVERAGE:0.1:7:8') - - dbset = rrd._DbSet('.', None, None, None) - dbset.load('1.rrd', 1) - self.assertEqual( - ['./1.rrd'], - [i.path for i in dbset._revisions]) - dbset.load('2.rrd' ,2) - self.assertEqual( - ['./1.rrd', './2.rrd'], - [i.path for i in dbset._revisions]) - dbset.load('3.rrd', 3) - self.assertEqual( - ['./1.rrd', './2.rrd', './3.rrd'], - [i.path for i in dbset._revisions]) - - dbset = rrd._DbSet('.', None, None, None) - dbset.load('3.rrd', 3) - self.assertEqual( - ['./3.rrd'], - [i.path for i in dbset._revisions]) - dbset.load('2.rrd', 2) - self.assertEqual( - ['./2.rrd', './3.rrd'], - [i.path for i in dbset._revisions]) - dbset.load('1.rrd', 1) - self.assertEqual( - ['./1.rrd', './2.rrd', './3.rrd'], - [i.path for i in dbset._revisions]) - - def test_DbSet_put_ToNewDbAndSkipOlds(self): - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - - ts = int(time.time()) - dbset.put({'f1': 1, 'f2': 1}, ts) - __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3)) - self.assertEqual('f1', f1) - self.assertEqual('f2', f2) - assert (1, 1) in values - - dbset.put({'f1': 2, 'f2': 2}, ts) - ts = int(time.time()) - __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3)) - assert (2, 2) not in values - - dbset.put({'f1': 3, 'f2': 3}, ts + 1) - ts = int(time.time()) - __, (f1, f2), values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 3)) - assert (3, 3) in values - - def test_DbSet_put_WithChangedLayout(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1}, ts) - self.assertEqual('./test.rrd', dbset._get_db(0).path) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - dbset.put({'f1': 2, 'f2': 2}, ts) - assert dbset._get_db(0) is None - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - dbset.put({'f1': 2, 'f2': 2}, ts + 1) - self.assertEqual('./test-1.rrd', dbset._get_db(0).path) - - __, __, values = rrdtool.fetch('test.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 10)) - assert (1,) in values - assert (2, 2) not in values - - __, __, values = rrdtool.fetch('test-1.rrd', 'AVERAGE', '-s', str(ts - 1), '-e', str(ts + 10)) - assert (1,) not in values - assert (2, 2) in values - - def test_DbSet_put_WithChangedRRA(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1}, ts) - self.assertEqual('./test.rrd', dbset._get_db(0).path) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.1:1:10']) - dbset.load('test.rrd', 0) - dbset.put({'f1': 1}, ts + 1) - self.assertEqual('./test-1.rrd', dbset._get_db(0).path) - - def test_DbSet_put_WithChangedStep(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1}, ts) - self.assertEqual('./test.rrd', dbset._get_db(0).path) - - dbset = rrd._DbSet('.', 'test', 2, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - dbset.put({'f1': 1}, ts + 2) - self.assertEqual('./test-1.rrd', dbset._get_db(0).path) - - def test_DbSet_get_OneDb(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1, 'f2': 1}, ts) - dbset.put({'f1': 2, 'f2': 2}, ts + 1) - dbset.put({'f1': 3, 'f2': 3}, ts + 2) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0})], - [(t, i) for t, i in dbset.get(ts, ts)]) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 1, {'f1': 2.0, 'f2': 2.0})], - [(t, i) for t, i in dbset.get(ts, ts + 1)]) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 1, {'f1': 2.0, 'f2': 2.0}), (ts + 2, {'f1': 3.0, 'f2': 3.0})], - [(t, i) for t, i in dbset.get(ts, ts + 2)]) - - def test_DbSet_get_OneDbLongSteps(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 3, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1, 'f2': 1}, ts) - dbset.put({'f1': 2, 'f2': 2}, ts + 3) - dbset.put({'f1': 3, 'f2': 3}, ts + 6) - - ts = ts / 3 * 3 - - dbset = rrd._DbSet('.', 'test', 3, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0})], - [(t, i) for t, i in dbset.get(ts, ts)]) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 3, {'f1': 2.0, 'f2': 2.0})], - [(t, i) for t, i in dbset.get(ts, ts + 3)]) - self.assertEqual( - [(ts, {'f1': 1.0, 'f2': 1.0}), (ts + 3, {'f1': 2.0, 'f2': 2.0}), (ts + 6, {'f1': 3.0, 'f2': 3.0})], - [(t, i) for t, i in dbset.get(ts, ts + 6)]) - - def test_DbSet_get_MultipeDbs(self): - ts = int(time.time()) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.put({'f1': 1, 'f2': 1}, ts) - ts = dbset._get_db(0).last - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.6:1:10']) - dbset.load('test.rrd', 0) - dbset.put({'f1': 2, 'f2': 2}, ts + 1) - dbset.put({'f1': 3, 'f2': 3}, ts + 2) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.7:1:10']) - dbset.load('test.rrd', 0) - dbset.load('test-1.rrd', 1) - dbset.put({'f1': 4, 'f2': 4}, ts + 3) - dbset.put({'f1': 5, 'f2': 5}, ts + 4) - dbset.put({'f1': 6, 'f2': 6}, ts + 5) - - dbset = rrd._DbSet('.', 'test', 1, ['RRA:AVERAGE:0.5:1:10']) - dbset.load('test.rrd', 0) - dbset.load('test-1.rrd', 1) - dbset.load('test-2.rrd', 2) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts)]) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - (ts + 1, {'f1': 2.0, 'f2': 2.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts + 1)]) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - (ts + 1, {'f1': 2.0, 'f2': 2.0}), - (ts + 2, {'f1': 3.0, 'f2': 3.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts + 2)]) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - (ts + 1, {'f1': 2.0, 'f2': 2.0}), - (ts + 2, {'f1': 3.0, 'f2': 3.0}), - (ts + 3, {'f1': 4.0, 'f2': 4.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts + 3)]) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - (ts + 1, {'f1': 2.0, 'f2': 2.0}), - (ts + 2, {'f1': 3.0, 'f2': 3.0}), - (ts + 3, {'f1': 4.0, 'f2': 4.0}), - (ts + 4, {'f1': 5.0, 'f2': 5.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts + 4)]) - self.assertEqual( - [ - (ts, {'f1': 1.0, 'f2': 1.0}), - (ts + 1, {'f1': 2.0, 'f2': 2.0}), - (ts + 2, {'f1': 3.0, 'f2': 3.0}), - (ts + 3, {'f1': 4.0, 'f2': 4.0}), - (ts + 4, {'f1': 5.0, 'f2': 5.0}), - (ts + 5, {'f1': 6.0, 'f2': 6.0}), - ], - [(t, i) for t, i in dbset.get(ts, ts + 5)]) - - def test_NoTimestampDupes(self): - start_ts = int(time.time()) - end_ts = start_ts + 86400 * 3 - - dbset = rrd._DbSet('.', 'test', 300, [ - 'RRA:AVERAGE:0.5:1:288', - 'RRA:AVERAGE:0.5:3:672', - 'RRA:AVERAGE:0.5:12:744', - 'RRA:AVERAGE:0.5:144:732', - ]) - for i in xrange((end_ts - start_ts) / 300): - dbset.put({'f': i}, start_ts + i * 300) - - prev_ts = -1 - prev_value = -1 - for ts, value in dbset.get(start_ts, end_ts, 86400): - value = value['f'] - assert ts > prev_ts - assert value > prev_value - prev_ts = ts - prev_value = value - - -if __name__ == '__main__': - tests.main() |