diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2013-03-01 15:29:59 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2013-03-01 15:29:59 (GMT) |
commit | a6e32740d3a374a0f07f0bebabaa50395480f0a0 (patch) | |
tree | 31341280866c910a67427c27d7e66823f2df27cd | |
parent | fa1586847d81cc71eec09cce5d98fa79800cd6fc (diff) |
Fix sync on master side
-rwxr-xr-x | sugar-network-client | 9 | ||||
-rwxr-xr-x | sugar-network-node | 6 | ||||
-rw-r--r-- | sugar_network/node/__init__.py | 5 | ||||
-rw-r--r-- | sugar_network/node/downloads.py | 128 | ||||
-rw-r--r-- | sugar_network/node/master.py | 168 | ||||
-rw-r--r-- | sugar_network/node/slave.py | 9 | ||||
-rw-r--r-- | sugar_network/node/sync.py | 42 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 9 | ||||
-rw-r--r-- | sugar_network/toolkit/util.py | 6 | ||||
-rw-r--r-- | tests/__init__.py | 5 | ||||
-rw-r--r-- | tests/units/node/__main__.py | 1 | ||||
-rwxr-xr-x | tests/units/node/downloads.py | 119 | ||||
-rwxr-xr-x | tests/units/node/sync.py | 242 | ||||
-rwxr-xr-x | tests/units/node/sync_master.py | 879 |
14 files changed, 904 insertions, 724 deletions
diff --git a/sugar-network-client b/sugar-network-client index 5d7c652..c2f4a81 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -52,8 +52,8 @@ class Application(application.Daemon): application.logdir.value = join(client.local_root.value, 'log') else: application.logdir.value = sugar.profile_path('logs') - if not exists(toolkit.tmpdir.value): - os.makedirs(toolkit.tmpdir.value) + if not exists(toolkit.cachedir.value): + os.makedirs(toolkit.cachedir.value) application.rundir.value = join(client.local_root.value, 'run') coroutine.signal(signal.SIGCHLD, self.__SIGCHLD_cb) @@ -196,12 +196,13 @@ application.debug.value = sugar.logger_level() node.trust_users.value = True # If tmpfs is mounted to /tmp, `os.fstat()` will return 0 free space # and will brake offline synchronization logic -toolkit.tmpdir.value = sugar.profile_path('tmp') +toolkit.cachedir.value = sugar.profile_path('tmp') Option.seek('main', application) +Option.seek('main', [toolkit.cachedir]) Option.seek('webui', webui) Option.seek('client', client) -Option.seek('client', [sugar.keyfile, toolkit.tmpdir]) +Option.seek('client', [sugar.keyfile]) Option.seek('node', [node.port, node.files_root]) Option.seek('stats', stats_user) Option.seek('db', db) diff --git a/sugar-network-node b/sugar-network-node index f1a58ee..4cceb30 100755 --- a/sugar-network-node +++ b/sugar-network-node @@ -41,8 +41,8 @@ class Application(application.Daemon): jobs = coroutine.Pool() def ensure_run(self): - if toolkit.tmpdir.value and not exists(toolkit.tmpdir.value): - os.makedirs(toolkit.tmpdir.value) + if toolkit.cachedir.value and not exists(toolkit.cachedir.value): + os.makedirs(toolkit.cachedir.value) if not exists(node.data_root.value): os.makedirs(node.data_root.value) enforce(os.access(node.data_root.value, os.W_OK), @@ -121,10 +121,10 @@ monkey.patch_time() locale.setlocale(locale.LC_ALL, '') Option.seek('main', application) +Option.seek('main', [toolkit.cachedir]) Option.seek('webui', webui) Option.seek('client', [client.api_url]) Option.seek('node', node) -Option.seek('node', [toolkit.tmpdir]) Option.seek('node-stats', stats_node) Option.seek('user-stats', stats_user) Option.seek('obs', obs) diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index 08c8eea..f0d51c8 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -58,3 +58,8 @@ stats_root = Option( files_root = Option( 'path to a directory to keep files synchronized between nodes', default='/var/lib/sugar-network/files', name='files_root') + +pull_timeout = Option( + 'delay in seconds to return to sync-pull requester to wait until ' + 'pull request will be ready', + default=30, type_cast=int) diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py new file mode 100644 index 0000000..b2bd667 --- /dev/null +++ b/sugar_network/node/downloads.py @@ -0,0 +1,128 @@ +# Copyright (C) 2013 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Persistent pool with temporary files prepared to download.""" + +import os +import json +import hashlib +import logging +from glob import glob +from os.path import join, splitext, exists + +from sugar_network.toolkit import pylru, coroutine, exception + + +# Maximum numer of postponed pulls master can handle at the same time +_POOL_SIZE = 256 +_TAG_SUFFIX = '.tag' + +_logger = logging.getLogger('node.downloads') + + +class Pool(object): + + def __init__(self, root): + self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop()) + if not exists(root): + os.makedirs(root) + self._root = root + + for tag_path in glob(join(root, '*.tag')): + path, __ = splitext(tag_path) + if exists(path): + try: + with file(tag_path) as f: + key, tag = json.load(f) + pool_key = json.dumps(key) + self._pool[pool_key] = _Download(key, tag, path) + continue + except Exception: + exception('Cannot open %r download, recreate', tag_path) + os.unlink(path) + os.unlink(tag_path) + + def get(self, key): + key = json.dumps(key) + if key in self._pool: + return self._pool[key] + + def set(self, key, tag, fetcher, *args, **kwargs): + pool_key = json.dumps(key) + path = join(self._root, hashlib.sha1(pool_key).hexdigest()) + + def do_fetch(): + try: + complete = fetcher(*args, path=path, **kwargs) + except Exception: + exception('Error while fetching %r', self) + if exists(path): + os.unlink(path) + return True + with file(path + _TAG_SUFFIX, 'w') as f: + json.dump([key, tag], f) + return complete + + job = coroutine.spawn(do_fetch) + dl = self._pool[pool_key] = _Download(key, tag, path, job) + return dl + + def remove(self, key): + key = json.dumps(key) + if key in self._pool: + self._pool.peek(key).pop() + del self._pool[key] + + +class _Download(dict): + + def __init__(self, key, tag, path, job=None): + self.tag = tag + self._key = key + self._path = path + self._job = job + + def __repr__(self): + return '<Download %r path=%r>' % (self._key, self._path) + + @property + def ready(self): + # pylint: disable-msg=E1101 + return self._job is None or self._job.dead + + @property + def complete(self): + return self._job is not None and self._job.value + + @property + def length(self): + if exists(self._path): + return os.stat(self._path).st_size + + def open(self): + if exists(self._path): + return file(self._path, 'rb') + + def pop(self): + if self._job is not None: + _logger.debug('Abort fetching %r', self) + self._job.kill() + + if exists(self._path): + os.unlink(self._path) + if exists(self._path + _TAG_SUFFIX): + os.unlink(self._path + _TAG_SUFFIX) + + _logger.debug('Throw out %r from the pool', self) diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index b6a387b..dd7ff22 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -13,14 +13,17 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import base64 import logging from urlparse import urlsplit +from Cookie import SimpleCookie from os.path import join -from sugar_network import db, client -from sugar_network.node import sync, stats_user, files_root, files, volume +from sugar_network import db, client, node +from sugar_network.node import sync, stats_user, files, volume, downloads from sugar_network.node.commands import NodeCommands -from sugar_network.toolkit import util +from sugar_network.toolkit import cachedir, util, enforce _logger = logging.getLogger('node.master') @@ -31,29 +34,107 @@ class MasterCommands(NodeCommands): def __init__(self, volume_): guid = urlsplit(client.api_url.value).netloc NodeCommands.__init__(self, True, guid, volume_) + + self._pulls = { + 'pull': lambda layer, seq, out_seq=None: + ('diff', None, volume.diff(self.volume, seq, out_seq, layer)), + 'files_pull': lambda layer, seq, out_seq=None: + ('files_diff', None, self._files.diff(seq, out_seq)), + } + + self._pull_queue = downloads.Pool(join(cachedir.value, 'pulls')) self._files = None - if files_root.value: - self._files = files.Index(files_root.value, + if node.files_root.value: + self._files = files.Index(node.files_root.value, join(volume_.root, 'files.index'), volume_.seqno) @db.volume_command(method='POST', cmd='sync', permissions=db.ACCESS_AUTH) def sync(self, request): + reply, cookie = self._push(request) + for op, layer, seq in cookie: + reply.append(self._pulls[op](layer, seq)) + return sync.encode(src=self.guid, *reply) + + @db.volume_command(method='POST', cmd='push') + def push(self, request, response): + reply, cookie = self._push(request) + # Read passed cookie only after excluding `merged_seq`. + # If there is `pull` out of currently pushed packet, excluding + # `merged_seq` should not affect it. + cookie.update(_Cookie(request)) + cookie.store(response) + return sync.encode(src=self.guid, *reply) + + @db.volume_command(method='GET', cmd='pull', + mime_type='application/octet-stream', + arguments={'accept_length': db.to_int}) + def pull(self, request, response, accept_length=None): + cookie = _Cookie(request) + if not cookie: + _logger.warning('Requested full dump in pull command') + cookie.append(('pull', None, [[1, None]])) + cookie.append(('files_pull', None, [[1, None]])) + + reply = None + for pull_key in cookie: + op, layer, seq = pull_key + + pull = self._pull_queue.get(pull_key) + if pull is not None: + if not pull.ready: + continue + if not pull.tag: + self._pull_queue.remove(pull_key) + cookie.remove(pull_key) + continue + if accept_length is None or pull.length <= accept_length: + _logger.debug('Found ready to use %r', pull) + if pull.complete: + cookie.remove(pull_key) + else: + seq.exclude(pull.tag) + reply = pull.open() + break + _logger.debug('Existing %r is too big, will recreate', pull) + self._pull_queue.remove(pull_key) + + out_seq = util.Sequence() + pull = self._pull_queue.set(pull_key, out_seq, + sync.sneakernet_encode, + [self._pulls[op](layer, seq, out_seq)], + limit=accept_length, src=self.guid) + _logger.debug('Start new %r', pull) + + if reply is None: + if cookie: + _logger.debug('No ready pulls') + # TODO Might be useful to set meaningful value here + cookie.delay = node.pull_timeout.value + else: + _logger.debug('Nothing to pull') + + cookie.store(response) + return reply + + def _push(self, request): reply = [] - merged_seq = util.Sequence([]) + cookie = _Cookie() pull_seq = None + merged_seq = util.Sequence([]) for packet in sync.decode(request.content_stream): + src = packet['src'] + enforce(packet['dst'] == self.guid, 'Misaddressed packet') + if packet.name == 'pull': - pull_seq = util.Sequence(packet['sequence']) - reply.append(('diff', {}, volume.diff(self.volume, pull_seq))) + pull_seq = cookie['pull', packet['layer']] + pull_seq.include(packet['sequence']) elif packet.name == 'files_pull': if self._files is not None: - seq = util.Sequence(packet['sequence']) - reply.append(('files_diff', None, self._files.diff(seq))) + cookie['files_pull'].include(packet['sequence']) elif packet.name == 'diff': - src = packet['src'] seq, ack_seq = volume.merge(self.volume, packet) reply.append(('ack', { 'ack': ack_seq, @@ -62,12 +143,65 @@ class MasterCommands(NodeCommands): }, None)) merged_seq.include(ack_seq) elif packet.name == 'stats_diff': - src = packet['src'] - seq = stats_user.merge(packet) - reply.append(('stats_ack', - {'sequence': seq, 'dst': src}, None)) + reply.append(('stats_ack', { + 'sequence': stats_user.merge(packet), + 'dst': src, + }, None)) - if pull_seq: + if pull_seq is not None: pull_seq.exclude(merged_seq) - return sync.encode(src=self.guid, *reply) + return reply, cookie + + +class _Cookie(list): + + def __init__(self, request=None): + list.__init__(self) + if request is not None: + self.update(self._get_cookie(request, 'sugar_network_sync') or []) + self.delay = 0 + + def update(self, that): + for op, layer, seq in that: + self[op, layer].include(seq) + + def store(self, response): + if self: + _logger.debug('Postpone %r in cookie', self) + to_store = base64.b64encode(json.dumps(self)) + self._set_cookie(response, 'sugar_network_sync', to_store) + self._set_cookie(response, 'sugar_network_delay', self.delay) + else: + self._unset_cookie(response, 'sugar_network_sync') + self._unset_cookie(response, 'sugar_network_delay') + + def __getitem__(self, key): + if not isinstance(key, tuple): + key = (key, None) + for op, layer, seq in self: + if (op, layer) == key: + return seq + seq = util.Sequence() + self.append(key + (seq,)) + return seq + + def _get_cookie(self, request, name): + cookie_str = request.environ.get('HTTP_COOKIE') + if not cookie_str: + return + cookie = SimpleCookie() + cookie.load(cookie_str) + if name not in cookie: + return + value = cookie.get(name).value + if value != 'unset_%s' % name: + return json.loads(base64.b64decode(value)) + + def _set_cookie(self, response, name, value, age=3600): + response.setdefault('Set-Cookie', []) + cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age) + response['Set-Cookie'].append(cookie) + + def _unset_cookie(self, response, name): + self._set_cookie(response, name, 'unset_%s' % name, 0) diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 0f6f5a4..edd6a3f 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -47,15 +47,16 @@ class SlaveCommands(NodeCommands): @db.volume_command(method='POST', cmd='online_sync', permissions=db.ACCESS_LOCAL) def online_sync(self): - push = [('diff', {'src': self.guid}, - volume.diff(self.volume, self._push_seq)), + push = [('diff', None, volume.diff(self.volume, self._push_seq)), ('pull', {'sequence': self._pull_seq}, None), ('files_pull', {'sequence': self._files_seq}, None), ] if stats_user.stats_user.value: - push.append(('stats_diff', {'src': self.guid}, stats_user.diff())) + push.append(('stats_diff', None, stats_user.diff())) response = Client().request('POST', - data=sync.chunked_encode(*push), params={'cmd': 'sync'}, + data=sync.chunked_encode(*push, + src=self.guid, dst=self._master_guid), + params={'cmd': 'sync'}, headers={'Transfer-Encoding': 'chunked'}) self._import(sync.decode(response.raw), None) diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py index 010bb9b..8a925da 100644 --- a/sugar_network/node/sync.py +++ b/sugar_network/node/sync.py @@ -15,13 +15,13 @@ import os import gzip +import json import logging -import cPickle as pickle from types import GeneratorType from os.path import exists, join, dirname from sugar_network import db -from sugar_network.toolkit import BUFFER_SIZE, util, enforce +from sugar_network.toolkit import BUFFER_SIZE, coroutine, util, enforce # Leave at leat n bytes in fs whle calling `encode_to_file()` @@ -48,8 +48,8 @@ def limited_encode(limit, *packets, **header): return _encode(limit, packets, header, _EncodingStatus()) -def chunked_encode(*packets): - return _ChunkedEncoder(encode(*packets)) +def chunked_encode(*packets, **header): + return _ChunkedEncoder(encode(*packets, **header)) def sneakernet_decode(root, node=None, session=None): @@ -59,7 +59,7 @@ def sneakernet_decode(root, node=None, session=None): continue zipfile = gzip.open(join(root, filename), 'rb') try: - package_props = pickle.load(zipfile) + package_props = json.loads(zipfile.readline()) if node is not None and package_props.get('src') == node: if package_props.get('session') == session: @@ -95,7 +95,8 @@ def sneakernet_encode(packets, root=None, limit=None, path=None, **header): with file(path, 'wb') as package: zipfile = gzip.GzipFile(fileobj=package) try: - pickle.dump(header, zipfile) + json.dump(header, zipfile) + zipfile.write('\n') pos = None encoder = _encode(limit, packets, None, status) @@ -104,6 +105,7 @@ def sneakernet_encode(packets, root=None, limit=None, path=None, **header): chunk = encoder.send(pos) zipfile.write(chunk) pos = zipfile.fileobj.tell() + coroutine.dispatch() except StopIteration: break @@ -135,7 +137,7 @@ def _encode(limit, packets, header, status): if header: props.update(header) props['packet'] = packet - pos = (yield pickle.dumps(props)) or 0 + pos = (yield json.dumps(props) + '\n') or 0 if content is None: continue @@ -150,7 +152,7 @@ def _encode(limit, packets, header, status): blob = record.pop('blob') blob_size = record['blob_size'] = os.stat(blob).st_size - dump = pickle.dumps(record) + dump = json.dumps(record) + '\n' if not status.aborted and limit is not None and \ pos + len(dump) + blob_size > limit: status.aborted = True @@ -158,7 +160,7 @@ def _encode(limit, packets, header, status): raise StopIteration() record = content.throw(StopIteration()) continue - pos = (yield pickle.dumps(record)) or 0 + pos = (yield dump) or 0 if blob is not None: with file(blob, 'rb') as f: @@ -172,7 +174,7 @@ def _encode(limit, packets, header, status): except StopIteration: pass - yield pickle.dumps({'packet': 'last'}) + yield json.dumps({'packet': 'last'}) + '\n' class _ChunkedEncoder(object): @@ -235,12 +237,16 @@ class _PacketsIterator(object): return self.props.get(key) def __iter__(self): + blob = None while True: - try: - record = pickle.load(self._stream) - except EOFError: + if blob is not None and blob.size_to_read: + self._stream.seek(blob.size_to_read, 1) + blob = None + record = self._stream.readline() + if not record: self._name = None - raise + raise EOFError() + record = json.loads(record) packet = record.get('packet') if packet: self._name = packet @@ -249,7 +255,7 @@ class _PacketsIterator(object): break blob_size = record.get('blob_size') if blob_size: - record['blob'] = _Blob(self._stream, blob_size) + blob = record['blob'] = _Blob(self._stream, blob_size) yield record def __enter__(self): @@ -263,9 +269,9 @@ class _Blob(object): def __init__(self, stream, size): self._stream = stream - self._size_to_read = size + self.size_to_read = size def read(self, size=BUFFER_SIZE): - chunk = self._stream.read(min(size, self._size_to_read)) - self._size_to_read -= len(chunk) + chunk = self._stream.read(min(size, self.size_to_read)) + self.size_to_read -= len(chunk) return chunk diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index ffb0b8f..7673016 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -23,11 +23,10 @@ from sugar_network.toolkit.options import Option BUFFER_SIZE = 1024 * 10 -tmpdir = Option( - 'if specified, use this directory for temporary files; such files ' - 'might take considerable number of bytes while downloading of ' - 'synchronizing Sugar Network content', - name='tmpdir') +cachedir = Option( + 'path to a directory to keep cached files; such files ' + 'might take considerable number of bytes', + default='/var/cache/sugar-network', name='cachedir') def enforce(condition, error=None, *args): diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py index dcbe2db..727edba 100644 --- a/sugar_network/toolkit/util.py +++ b/sugar_network/toolkit/util.py @@ -24,7 +24,7 @@ import collections from os.path import exists, join, islink, isdir, dirname, basename, abspath from os.path import lexists, isfile -from sugar_network.toolkit import tmpdir, enforce +from sugar_network.toolkit import cachedir, enforce _logger = logging.getLogger('toolkit.util') @@ -306,8 +306,8 @@ def unique_filename(root, filename): def NamedTemporaryFile(*args, **kwargs): - if tmpdir.value: - kwargs['dir'] = tmpdir.value + if cachedir.value: + kwargs['dir'] = cachedir.value return tempfile.NamedTemporaryFile(*args, **kwargs) diff --git a/tests/__init__.py b/tests/__init__.py index 4e73db1..92c0fae 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -27,7 +27,7 @@ from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.resources.implementation import Implementation from sugar_network.node.commands import NodeCommands -from sugar_network.node import stats_user, stats_node, obs, auth, slave +from sugar_network.node import stats_user, stats_node, obs, auth, slave, downloads from sugar_network.resources.volume import Volume @@ -104,11 +104,12 @@ class Test(unittest.TestCase): obs._client = None http._RECONNECTION_NUMBER = 0 auth.reset() - toolkit.tmpdir.value = tmpdir + '/tmp' + toolkit.cachedir.value = tmpdir + '/tmp' injector.invalidate_solutions(None) injector._pms_path = None journal._ds_root = tmpdir + '/datastore' solver.nodeps = False + downloads._POOL_SIZE = 256 Volume.RESOURCES = [ 'sugar_network.resources.user', diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py index 4a0d50c..684e1b7 100644 --- a/tests/units/node/__main__.py +++ b/tests/units/node/__main__.py @@ -3,6 +3,7 @@ from __init__ import tests from auth import * +from downloads import * from files import * from node import * from obs import * diff --git a/tests/units/node/downloads.py b/tests/units/node/downloads.py new file mode 100755 index 0000000..29ea5d0 --- /dev/null +++ b/tests/units/node/downloads.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import json +from os.path import exists + +from __init__ import tests + +from sugar_network.node import downloads +from sugar_network.toolkit import coroutine + + +class DownloadsTest(tests.Test): + + def test_populate(self): + self.touch(('1', '1')) + self.touch(('2', '2')) + self.touch(('2.tag', json.dumps((2, {'file': 2})))) + self.touch(('3.tag', json.dumps((3, {'file': 3})))) + + pool = downloads.Pool('.') + self.assertEqual(None, pool.get(1)) + self.assertEqual(2, pool.get(2).tag['file']) + self.assertEqual('2', pool.get(2).open().read()) + self.assertEqual(None, pool.get(3)) + + def test_ComplexKeys(self): + key = {-1: None} + + self.touch(('file', 'file')) + self.touch(('file.tag', json.dumps((key, {'file': 2})))) + + pool = downloads.Pool('.') + self.assertEqual('file', pool.get(key).open().read()) + key['foo'] = 'bar' + pool.set(key, None, lambda path: file(path, 'w').close()) + self.assertNotEqual(None, pool.get(key)) + + def test_set_Tags(self): + tag = [] + + def fetch(path): + with file(path, 'w') as f: + f.write('payload') + tag.append(True) + + pool = downloads.Pool('.') + dl = pool.set('key', tag, fetch) + self.assertEqual(False, dl.ready) + self.assertEqual(None, dl.open()) + self.assertEqual([], tag) + + coroutine.dispatch() + self.assertEqual(True, dl.ready) + self.assertEqual('payload', dl.open().read()) + self.assertEqual([True], tag) + + pool2 = downloads.Pool('.') + dl2 = pool2.get('key') + self.assertEqual(True, dl2.ready) + self.assertEqual('payload', dl2.open().read()) + self.assertEqual([True], tag) + + def test_Eject(self): + downloads._POOL_SIZE = 3 + pool = downloads.Pool('.') + + pool.set(1, None, lambda path: file(path, 'w').close()) + coroutine.dispatch() + file1 = pool.get(1).open().name + pool.set(2, None, lambda path: file(path, 'w').close()) + coroutine.dispatch() + file2 = pool.get(2).open().name + pool.set(3, None, lambda path: file(path, 'w').close()) + coroutine.dispatch() + file3 = pool.get(3).open().name + + assert pool.get(1) is not None + assert exists(file1) + assert exists(file1 + '.tag') + assert pool.get(2) is not None + assert exists(file2) + assert exists(file2 + '.tag') + assert pool.get(3) is not None + assert exists(file3) + assert exists(file3 + '.tag') + + pool.set(4, None, lambda path: file(path, 'w').close()) + pool.set(5, None, lambda path: file(path, 'w').close()) + pool.set(6, None, lambda path: file(path, 'w').close()) + + assert pool.get(1) is None + assert not exists(file1) + assert not exists(file1 + '.tag') + assert pool.get(2) is None + assert not exists(file2) + assert not exists(file2 + '.tag') + assert pool.get(3) is None + assert not exists(file3) + assert not exists(file3 + '.tag') + + def test_remove(self): + pool = downloads.Pool('.') + + pool.set(1, None, lambda path: file(path, 'w').close()) + coroutine.dispatch() + file1 = pool.get(1).open().name + assert pool.get(1) is not None + assert exists(file1) + assert exists(file1 + '.tag') + + pool.remove(1) + assert pool.get(1) is None + assert not exists(file1) + assert not exists(file1 + '.tag') + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py index 6e512f9..9cbb6a5 100755 --- a/tests/units/node/sync.py +++ b/tests/units/node/sync.py @@ -3,8 +3,8 @@ import os import uuid +import json from StringIO import StringIO -import cPickle as pickle from os.path import exists from __init__ import tests @@ -17,13 +17,13 @@ class SyncTest(tests.Test): def test_decode(self): stream = StringIO() - pickle.dump({'foo': 'bar'}, stream) + dump({'foo': 'bar'}, stream) stream.seek(0) packets_iter = sync.decode(stream) self.assertRaises(EOFError, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - pickle.dump({'packet': 1, 'bar': 'foo'}, stream) + dump({'packet': 1, 'bar': 'foo'}, stream) stream.seek(0) packets_iter = sync.decode(stream) with next(packets_iter) as packet: @@ -34,7 +34,7 @@ class SyncTest(tests.Test): self.assertRaises(EOFError, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - pickle.dump({'payload': 1}, stream) + dump({'payload': 1}, stream) stream.seek(0) packets_iter = sync.decode(stream) with next(packets_iter) as packet: @@ -45,8 +45,8 @@ class SyncTest(tests.Test): self.assertRaises(EOFError, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - pickle.dump({'packet': 2}, stream) - pickle.dump({'payload': 2}, stream) + dump({'packet': 2}, stream) + dump({'payload': 2}, stream) stream.seek(0) packets_iter = sync.decode(stream) with next(packets_iter) as packet: @@ -62,7 +62,7 @@ class SyncTest(tests.Test): self.assertRaises(EOFError, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - pickle.dump({'packet': 'last'}, stream) + dump({'packet': 'last'}, stream) stream.seek(0) packets_iter = sync.decode(stream) with next(packets_iter) as packet: @@ -84,13 +84,13 @@ class SyncTest(tests.Test): self.assertEqual(len(stream.getvalue()), stream.tell()) stream = StringIO() - pickle.dump({'foo': 'bar'}, stream) + dump({'foo': 'bar'}, stream) stream.seek(0) packets_iter = sync.decode(stream) self.assertRaises(EOFError, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - pickle.dump({'packet': 'last'}, stream) + dump({'packet': 'last'}, stream) stream.seek(0) packets_iter = sync.decode(stream) self.assertRaises(StopIteration, packets_iter.next) @@ -98,13 +98,13 @@ class SyncTest(tests.Test): def test_decode_SkipPackets(self): stream = StringIO() - pickle.dump({'packet': 1}, stream) - pickle.dump({'payload': 1}, stream) - pickle.dump({'payload': 11}, stream) - pickle.dump({'payload': 111}, stream) - pickle.dump({'packet': 2}, stream) - pickle.dump({'payload': 2}, stream) - pickle.dump({'packet': 'last'}, stream) + dump({'packet': 1}, stream) + dump({'payload': 1}, stream) + dump({'payload': 11}, stream) + dump({'payload': 111}, stream) + dump({'packet': 2}, stream) + dump({'payload': 2}, stream) + dump({'packet': 'last'}, stream) stream.seek(0) packets_iter = sync.decode(stream) @@ -133,21 +133,21 @@ class SyncTest(tests.Test): def test_encode(self): self.assertEqual([ - pickle.dumps({'packet': 'last'}), + dumps({'packet': 'last'}), ], [i for i in sync.encode()]) self.assertEqual([ - pickle.dumps({'packet': None, 'foo': 'bar'}), - pickle.dumps({'packet': 'last'}), + dumps({'packet': None, 'foo': 'bar'}), + dumps({'packet': 'last'}), ], [i for i in sync.encode((None, None, None), foo='bar')]) self.assertEqual([ - pickle.dumps({'packet': 1}), - pickle.dumps({'packet': '2', 'n': 2}), - pickle.dumps({'packet': '3', 'n': 3}), - pickle.dumps({'packet': 'last'}), + dumps({'packet': 1}), + dumps({'packet': '2', 'n': 2}), + dumps({'packet': '3', 'n': 3}), + dumps({'packet': 'last'}), ], [i for i in sync.encode( (1, {}, None), @@ -156,16 +156,16 @@ class SyncTest(tests.Test): )]) self.assertEqual([ - pickle.dumps({'packet': 1}), - pickle.dumps({1: 1}), - pickle.dumps({'packet': 2}), - pickle.dumps({2: 2}), - pickle.dumps({2: 2}), - pickle.dumps({'packet': 3}), - pickle.dumps({3: 3}), - pickle.dumps({3: 3}), - pickle.dumps({3: 3}), - pickle.dumps({'packet': 'last'}), + dumps({'packet': 1}), + dumps({1: 1}), + dumps({'packet': 2}), + dumps({2: 2}), + dumps({2: 2}), + dumps({'packet': 3}), + dumps({3: 3}), + dumps({3: 3}), + dumps({3: 3}), + dumps({'packet': 'last'}), ], [i for i in sync.encode( (1, None, [{1: 1}]), @@ -174,8 +174,8 @@ class SyncTest(tests.Test): )]) def test_limited_encode(self): - header_size = len(pickle.dumps({'packet': 'first'})) - record_size = len(pickle.dumps({'record': 0})) + header_size = len(dumps({'packet': 'first'})) + record_size = len(dumps({'record': 0})) def content(): yield {'record': 1} @@ -183,32 +183,32 @@ class SyncTest(tests.Test): yield {'record': 3} i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'record': 1}, pickle.loads(i.send(header_size))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'record': 1}, json.loads(i.send(header_size))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + 1))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + 1))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + record_size * 2, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'record': 1}, pickle.loads(i.send(header_size))) - self.assertEqual({'record': 2}, pickle.loads(i.send(header_size + record_size))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size * 2))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'record': 1}, json.loads(i.send(header_size))) + self.assertEqual({'record': 2}, json.loads(i.send(header_size + record_size))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size * 2))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + record_size * 2, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'record': 1}, pickle.loads(i.send(header_size))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size + 1))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'record': 1}, json.loads(i.send(header_size))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size + 1))) self.assertRaises(StopIteration, i.next) def test_limited_encode_FinalRecords(self): - header_size = len(pickle.dumps({'packet': 'first'})) - record_size = len(pickle.dumps({'record': 0})) + header_size = len(dumps({'packet': 'first'})) + record_size = len(dumps({'record': 0})) def content(): try: @@ -221,24 +221,24 @@ class SyncTest(tests.Test): yield {'record': 5} i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'record': 4}, pickle.loads(i.send(header_size + 1))) - self.assertEqual({'record': 5}, pickle.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'record': 4}, json.loads(i.send(header_size + 1))) + self.assertEqual({'record': 5}, json.loads(i.send(999999999))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'record': 1}, pickle.loads(i.send(header_size))) - self.assertEqual({'record': 4}, pickle.loads(i.send(header_size + record_size * 2 - 1))) - self.assertEqual({'record': 5}, pickle.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'record': 1}, json.loads(i.send(header_size))) + self.assertEqual({'record': 4}, json.loads(i.send(header_size + record_size * 2 - 1))) + self.assertEqual({'record': 5}, json.loads(i.send(999999999))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) self.assertRaises(StopIteration, i.next) def test_limited_encode_Blobs(self): - header_size = len(pickle.dumps({'packet': 'first'})) - blob_header_size = len(pickle.dumps({'blob_size': 100})) - record_size = len(pickle.dumps({'record': 2})) + header_size = len(dumps({'packet': 'first'})) + blob_header_size = len(dumps({'blob_size': 100})) + record_size = len(dumps({'record': 2})) self.touch(('blob', '*' * 100)) def content(): @@ -247,36 +247,36 @@ class SyncTest(tests.Test): yield {'record': 3} i = sync.limited_encode(header_size + blob_header_size + 99, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + blob_header_size + 100, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + blob_header_size + 100 + record_size - 1, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100))) self.assertRaises(StopIteration, i.next) i = sync.limited_encode(header_size + blob_header_size + 100 + record_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'record': 2}, pickle.loads(i.send(header_size + blob_header_size + 100))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100 + record_size))) + self.assertEqual({'record': 2}, json.loads(i.send(header_size + blob_header_size + 100))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100 + record_size))) self.assertRaises(StopIteration, i.next) def test_limited_encode_FinalBlobs(self): - header_size = len(pickle.dumps({'packet': 'first'})) - blob_header_size = len(pickle.dumps({'blob_size': 100})) - record_size = len(pickle.dumps({'record': 2})) + header_size = len(dumps({'packet': 'first'})) + blob_header_size = len(dumps({'blob_size': 100})) + record_size = len(dumps({'record': 2})) self.touch(('blob', '*' * 100)) def content(): @@ -288,49 +288,49 @@ class SyncTest(tests.Test): yield {'record': 3} i = sync.limited_encode(header_size, ('first', None, content()), ('second', None, content())) - self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size))) + self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) + self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) self.assertEqual('*' * 100, i.send(999999999)) - self.assertEqual({'record': 3}, pickle.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999))) + self.assertEqual({'record': 3}, json.loads(i.send(999999999))) + self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) self.assertRaises(StopIteration, i.next) def test_chunked_encode(self): output = sync.chunked_encode() - self.assertEqual({'packet': 'last'}, pickle.loads(decode_chunked(output.read(100)))) + self.assertEqual({'packet': 'last'}, json.loads(decode_chunked(output.read(100)))) data = [{'foo': 1}, {'bar': 2}] - data_stream = pickle.dumps({'packet': 'packet'}) + data_stream = dumps({'packet': 'packet'}) for record in data: - data_stream += pickle.dumps(record) - data_stream += pickle.dumps({'packet': 'last'}) + data_stream += dumps(record) + data_stream += dumps({'packet': 'last'}) output = sync.chunked_encode(('packet', None, iter(data))) - dump = StringIO() + pauload = StringIO() while True: chunk = output.read(1) if not chunk: break - dump.write(chunk) - self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + pauload.write(chunk) + self.assertEqual(data_stream, decode_chunked(pauload.getvalue())) output = sync.chunked_encode(('packet', None, iter(data))) - dump = StringIO() + pauload = StringIO() while True: chunk = output.read(2) if not chunk: break - dump.write(chunk) - self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + pauload.write(chunk) + self.assertEqual(data_stream, decode_chunked(pauload.getvalue())) output = sync.chunked_encode(('packet', None, iter(data))) - dump = StringIO() + pauload = StringIO() while True: chunk = output.read(1000) if not chunk: break - dump.write(chunk) - self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + pauload.write(chunk) + self.assertEqual(data_stream, decode_chunked(pauload.getvalue())) def test_encode_Blobs(self): self.touch(('1', 'a')) @@ -338,15 +338,15 @@ class SyncTest(tests.Test): self.touch(('3', 'ccc')) self.assertEqual([ - pickle.dumps({'packet': 1}), - pickle.dumps({'num': 1, 'blob_size': 1}), + dumps({'packet': 1}), + dumps({'num': 1, 'blob_size': 1}), 'a', - pickle.dumps({'num': 2, 'blob_size': 2}), + dumps({'num': 2, 'blob_size': 2}), 'bb', - pickle.dumps({'packet': 2}), - pickle.dumps({'num': 3, 'blob_size': 3}), + dumps({'packet': 2}), + dumps({'num': 3, 'blob_size': 3}), 'ccc', - pickle.dumps({'packet': 'last'}), + dumps({'packet': 'last'}), ], [i for i in sync.encode( (1, None, [{'num': 1, 'blob': '1'}, {'num': 2, 'blob': '2'}]), @@ -355,15 +355,15 @@ class SyncTest(tests.Test): def test_decode_Blobs(self): stream = StringIO() - pickle.dump({'packet': 1}, stream) - pickle.dump({'num': 1, 'blob_size': 1}, stream) + dump({'packet': 1}, stream) + dump({'num': 1, 'blob_size': 1}, stream) stream.write('a') - pickle.dump({'num': 2, 'blob_size': 2}, stream) + dump({'num': 2, 'blob_size': 2}, stream) stream.write('bb') - pickle.dump({'packet': 2}, stream) - pickle.dump({'num': 3, 'blob_size': 3}, stream) + dump({'packet': 2}, stream) + dump({'num': 3, 'blob_size': 3}, stream) stream.write('ccc') - pickle.dump({'packet': 'last'}, stream) + dump({'packet': 'last'}, stream) stream.seek(0) packets_iter = sync.decode(stream) @@ -383,6 +383,29 @@ class SyncTest(tests.Test): self.assertRaises(StopIteration, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) + def test_decode_SkipNotReadBlobs(self): + stream = StringIO() + dump({'packet': 1}, stream) + dump({'num': 1, 'blob_size': 1}, stream) + stream.write('a') + dump({'num': 2, 'blob_size': 2}, stream) + stream.write('bb') + dump({'packet': 2}, stream) + dump({'num': 3, 'blob_size': 3}, stream) + stream.write('ccc') + dump({'packet': 'last'}, stream) + stream.seek(0) + + packets_iter = sync.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual([1, 2], [i['num'] for i in packet]) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual([3], [i['num'] for i in packet]) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + def test_sneakernet_decode(self): sync.sneakernet_encode([ ('first', {'packet_prop': 1}, [ @@ -469,5 +492,14 @@ def decode_chunked(encdata): return newdata +def dump(value, stream): + stream.write(json.dumps(value)) + stream.write('\n') + + +def dumps(value): + return json.dumps(value) + '\n' + + if __name__ == '__main__': tests.main() diff --git a/tests/units/node/sync_master.py b/tests/units/node/sync_master.py index 9d60e3b..ef09845 100755 --- a/tests/units/node/sync_master.py +++ b/tests/units/node/sync_master.py @@ -2,10 +2,12 @@ # sugar-lint: disable import os +import gzip import time import json import base64 import hashlib +from glob import glob from os.path import join, exists from StringIO import StringIO @@ -20,9 +22,13 @@ from sugar_network.node.master import MasterCommands from sugar_network.resources.volume import Volume from sugar_network.toolkit.router import Router from sugar_network.toolkit import coroutine, util +from sugar_network.toolkit.rrd import Rrd -CHUNK = 100000 +class statvfs(object): + + f_bfree = None + f_frsize = 1 class SyncMasterTest(tests.Test): @@ -32,6 +38,8 @@ class SyncMasterTest(tests.Test): self.uuid = 0 self.override(db, 'uuid', self.next_uuid) + self.override(os, 'statvfs', lambda *args: statvfs()) + statvfs.f_bfree = 999999999 class Document(db.Document): @@ -39,6 +47,7 @@ class SyncMasterTest(tests.Test): def prop(self, value): return value + node.files_root.value = 'sync' self.volume = Volume('master', [Document]) self.master = MasterCommands(self.volume) @@ -60,7 +69,7 @@ class SyncMasterTest(tests.Test): {'commit': [[1, 1]]}, ]), ('pull', {'sequence': [[1, None]]}, None), - ): + dst='localhost:8888'): request.content_stream.write(chunk) request.content_stream.seek(0) @@ -87,7 +96,7 @@ class SyncMasterTest(tests.Test): }}, {'commit': [[2, 2]]}, ]), - ): + dst='localhost:8888'): request.content_stream.write(chunk) request.content_stream.seek(0) @@ -96,6 +105,7 @@ class SyncMasterTest(tests.Test): response.write(chunk) response.seek(0) self.assertEqual([ + ({'packet': 'ack', 'ack': [[2, 2]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []), ({'packet': 'diff', 'src': 'localhost:8888'}, [ {'document': 'document'}, {'guid': '1', 'diff': { @@ -106,702 +116,444 @@ class SyncMasterTest(tests.Test): }}, {'commit': [[1, 1]]}, ]), - ({'packet': 'ack', 'ack': [[2, 2]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []), ], [(packet.props, [i for i in packet]) for packet in sync.decode(response)]) - def __test_push_MisaddressedPackets(self): - master = MasterCommands('master') - response = db.Response() - - packet = OutBufferPacket() - request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - self.assertRaises(RuntimeError, master.push, request, response) - - packet = OutBufferPacket(src='node') + def test_MisaddressedPackets(self): request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - self.assertRaises(RuntimeError, master.push, request, response) + for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None)): + request.content_stream.write(chunk) + request.content_stream.seek(0) + self.assertRaises(RuntimeError, lambda: next(self.master.sync(request))) - packet = OutBufferPacket(dst='master') request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - self.assertRaises(RuntimeError, master.push, request, response) + for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None), dst='fake'): + request.content_stream.write(chunk) + request.content_stream.seek(0) + self.assertRaises(RuntimeError, lambda: next(self.master.sync(request))) - packet = OutBufferPacket(src='node', dst='fake') request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - self.assertRaises(RuntimeError, master.push, request, response) + for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None), dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) + next(self.master.sync(request)) - packet = OutBufferPacket(src='master', dst='master') - request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - self.assertRaises(RuntimeError, master.push, request, response) + def test_push_WithoutCookies(self): + ts = int(time.time()) - packet = OutBufferPacket(src='node', dst='master') request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - master.push(request, response) + for chunk in sync.encode( + ('diff', None, [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + 'prop': {'value': 'value', 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ]), + ('stats_diff', {'dst': 'localhost:8888'}, [ + {'db': 'db', 'user': 'user'}, + {'timestamp': ts, 'values': {'field': 1.0}}, + {'commit': {'user': {'db': [[1, ts]]}}}, + ]), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) - def __test_push_ProcessPushes(self): - master = MasterCommands('master') - request = Request() response = db.Response() - - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}}, - {'cmd': 'sn_push', 'guid': '2', 'diff': {'guid': {'value': '2', 'mtime': 1}}}, - {'cmd': 'sn_push', 'guid': '3', 'diff': {'guid': {'value': '3', 'mtime': 1}}}, - {'cmd': 'sn_commit', 'sequence': [[1, 3]]}, - ]) - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - - reply = master.push(request, response) + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) + self.assertEqual([ + ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[1, 1]], 'dst': None}, []), + ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[1, ts]]}}, 'src': 'localhost:8888', 'dst': None}, []), + ], + [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) self.assertEqual([ 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', ], response.get('Set-Cookie')) - self.assertEqual( - ['1', '2', '3'], - [i.guid for i in master.volume['document'].find()[0]]) - - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual('node', packet.header.get('dst')) - self.assertEqual([ - {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 3]],'merged': [[1, 3]]}, - ], - [i for i in packet]) - - def __test_push_ProcessPulls(self): - master = MasterCommands('master') + def test_push_WithCookies(self): + ts = int(time.time()) - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'sn_pull', 'sequence': [[1, 1]]}, - {'cmd': 'sn_pull', 'sequence': [[3, 4]]}, - {'cmd': 'sn_pull', 'sequence': [[7, None]]}, - ]) request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) + for chunk in sync.encode( + ('pull', {'sequence': [[1, None]]}, None), + ('files_pull', {'sequence': [[1, None]]}, None), + ('diff', None, [ + {'document': 'document'}, + {'guid': '2', 'diff': { + 'guid': {'value': '2', 'mtime': 2}, + 'ctime': {'value': 2, 'mtime': 2}, + 'mtime': {'value': 2, 'mtime': 2}, + 'prop': {'value': 'value', 'mtime': 2}, + }}, + {'commit': [[2, 2]]}, + ]), + ('stats_diff', {'dst': 'localhost:8888'}, [ + {'db': 'db', 'user': 'user'}, + {'timestamp': ts + 1, 'values': {'field': 2.0}}, + {'commit': {'user': {'db': [[2, ts]]}}}, + ]), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) response = db.Response() - reply = master.push(request, response) - assert reply is None + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1], [3, 4], [7, None]]})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', + ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []), + ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[2, ts]]}}, 'src': 'localhost:8888', 'dst': None}, []), ], - response.get('Set-Cookie')) - - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'files_pull', 'directory': 'dir1', 'sequence': [[1, 1]]}, - {'cmd': 'files_pull', 'directory': 'dir1', 'sequence': [[3, 4]]}, - {'cmd': 'files_pull', 'directory': 'dir2', 'sequence': [[7, None]]}, - ]) - request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - - response = db.Response() - reply = master.push(request, response) - assert reply is None + [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'dir1': [[1, 1], [3, 4]], 'dir2': [[7, None]]})), + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[2, None]]), ('files_pull', None, [[1, None]])])), 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'sn_pull', 'sequence': [[1, 1]]}, - {'cmd': 'files_pull', 'directory': 'dir', 'sequence': [[1, 1]]}, - ]) + def test_push_CollectCookies(self): request = Request() - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])) + for chunk in sync.encode( + ('pull', {'sequence': [[11, None]]}, None), + ('files_pull', {'sequence': [[11, None]]}, None), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) response = db.Response() - reply = master.push(request, response) - assert reply is None - self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'dir': [[1, 1]]})), + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) + self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) + self.assertEqual([ + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])), 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - def __test_push_TweakPullAccordingToPush(self): - master = MasterCommands('master') request = Request() + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])) + for chunk in sync.encode( + ('pull', {'sequence': [[1, 5]]}, None), + ('files_pull', {'sequence': [[1, 5]]}, None), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) response = db.Response() - - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}}, - {'cmd': 'sn_commit', 'sequence': [[1, 1]]}, - {'cmd': 'sn_pull', 'sequence': [[1, None]]}, - ]) - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - - reply = master.push(request, response) - - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual('node', packet.header.get('dst')) - self.assertEqual([ - {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 1]], 'merged': [[1, 1]]}, - ], - [i for i in packet]) - self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})), + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) + self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) + self.assertEqual([ + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[1, 5], [10, None]]), ('files_pull', None, [[1, 5], [10, None]])])), 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - def __test_push_DoNotTweakPullAccordingToPushIfCookieWasPassed(self): - master = MasterCommands('master') - request = Request() - response = db.Response() + def test_push_ExcludeAcksFromCookies(self): + ts = int(time.time()) - packet = OutBufferPacket(src='node', dst='master') - packet.push(document='document', data=[ - {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}}, - {'cmd': 'sn_commit', 'sequence': [[1, 1]]}, - {'cmd': 'sn_pull', 'sequence': [[1, None]]}, - ]) - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - - request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ - base64.b64encode(json.dumps({'sn_pull': [[1, None]]})) - - reply = master.push(request, response) + request = Request() + for chunk in sync.encode( + ('pull', {'sequence': [[1, None]]}, None), + ('diff', None, [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + 'prop': {'value': 'value', 'mtime': 1}, + }}, + {'commit': [[10, 10]]}, + ]), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual('node', packet.header.get('dst')) + response = db.Response() + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 1]], 'merged': [[1, 1]]}, + ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []), ], - [i for i in packet]) + [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[2, None]])])), 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - def __test_push_ProcessStatsPushes(self): - master = MasterCommands('master') - request = Request() - response = db.Response() - - ts = int(time.time()) - 1000 - packet = OutBufferPacket(src='node', dst='master') - packet.push(cmd='stats_push', user='user1', db='db1', sequence=[[1, ts + 2]], data=[ - {'timestamp': ts + 1, 'values': {'f': 1}}, - {'timestamp': ts + 2, 'values': {'f': 2}}, - ]) - packet.push(cmd='stats_push', user='user1', db='db2', sequence=[[2, ts + 3]], data=[ - {'timestamp': ts + 3, 'values': {'f': 3}}, - ]) - packet.push(cmd='stats_push', user='user2', db='db3', sequence=[[ts + 4, ts + 4]], data=[ - {'timestamp': ts + 4, 'values': {'f': 4}}, - ]) - request.content_stream = packet.pop() - request.content_length = len(request.content_stream.getvalue()) - - reply = master.push(request, response) - self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('Set-Cookie')) - - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual('node', packet.header.get('dst')) - self.assertEqual([ - {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'stats_ack', - 'sequence': { - 'user1': {'db1': [[1, ts + 2]], 'db2': [[2, ts + 3]]}, - 'user2': {'db3': [[ts + 4, ts + 4]]}, - }, - } - ], - [i for i in packet]) - - __, __, values = rrdtool.fetch('stats/user/us/user1/db1.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 2)) - self.assertEqual([(1,), (2,), (None,)], values) - - __, __, values = rrdtool.fetch('stats/user/us/user1/db2.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 3)) - self.assertEqual([(None,), (None,), (3,), (None,)], values) + def test_push_DoNotExcludeAcksFromExistingCookies(self): + ts = int(time.time()) - __, __, values = rrdtool.fetch('stats/user/us/user2/db3.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 4)) - self.assertEqual([(None,), (None,), (None,), (4,), (None,)], values) - - def __test_pull_ProcessPulls(self): - master = MasterCommands('master') request = Request() - response = db.Response() - - master.volume['document'].create(guid='1') - master.volume['document'].create(guid='2') - - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]])])) + for chunk in sync.encode( + ('diff', None, [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + 'prop': {'value': 'value', 'mtime': 1}, + }}, + {'commit': [[10, 10]]}, + ]), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) - request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) response = db.Response() - reply = master.pull(request, response) - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) - self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', - ], - response.get('Set-Cookie')) - - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, - 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, - 'prop': {'value': '', 'mtime': os.stat('db/document/2/2/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]}, + ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []), ], - [i for i in packet]) - - response = db.Response() - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]])])), + 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - self.assertEqual(3, len([i for i in packet])) - def __test_pull_AvoidEmptyPacketsOnPull(self): - master = MasterCommands('master') - request = Request() - response = db.Response() - - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) + def test_push_ExcludeAcksFromCookiesUsingProperLayer(self): + ts = int(time.time()) request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]])])) + for chunk in sync.encode( + ('pull', {'sequence': [[1, None]], 'layer': 'hidden'}, None), + ('diff', None, [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + 'prop': {'value': 'value', 'mtime': 1}, + }}, + {'commit': [[10, 10]]}, + ]), + dst='localhost:8888'): + request.content_stream.write(chunk) + request.content_stream.seek(0) + response = db.Response() - reply = master.pull(request, response) - assert reply is None - self.assertEqual('application/x-tar', response.content_type) + reply = StringIO() + for chunk in self.master.push(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []), ], - response.get('Set-Cookie')) - - def __test_pull_LimittedPull(self): - master = MasterCommands('master') - - master.volume['document'].create(guid='1', prop='*' * CHUNK) - master.volume['document'].create(guid='2', prop='*' * CHUNK) - - request = Request() - response = db.Response() - reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) - - request = Request() - response = db.Response() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) - reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + [(packet.props, [i for i in packet]) for packet in sync.decode(reply)]) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', 'hidden', [[2, None]]), ('pull', None, [[1, None]])])), + 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - self.assertEqual([ - {'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, - 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime}, - }}, - {'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 1]]}, - ], - [i for i in packet]) + def test_pull(self): + self.volume['document'].create(guid='1', prop='1', ctime=1, mtime=1) + self.volume['document'].create(guid='2', prop='2', ctime=2, mtime=2) + self.utime('master', 0) + self.touch(('sync/1', 'file1')) + self.touch(('sync/2', 'file2')) request = Request() + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])) response = db.Response() - reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + self.assertEqual(None, self.master.pull(request, response)) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})), + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])), 'sugar_network_delay=30; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - self.assertEqual(2, len([i for i in packet])) - - for i in master._pull_queue.values(): - i.unlink() - master._pull_queue.clear() - - request = Request() - response = db.Response() - reply = master.pull(request, response, accept_length=CHUNK * 2.5, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) + coroutine.sleep(.5) request = Request() + request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0] response = db.Response() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) - reply = master.pull(request, response, accept_length=CHUNK * 2.5, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + reply = StringIO() + for chunk in self.master.pull(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + ({'packet': 'diff'}, [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'prop': {'value': '1', 'mtime': 0}, + 'guid': {'value': '1', 'mtime': 0}, + 'ctime': {'value': 1, 'mtime': 0}, + 'mtime': {'value': 1, 'mtime': 0}}, + }, + {'guid': '2', 'diff': { + 'prop': {'value': '2', 'mtime': 0}, + 'guid': {'value': '2', 'mtime': 0}, + 'ctime': {'value': 2, 'mtime': 0}, + 'mtime': {'value': 2, 'mtime': 0}}, + }, + {'commit': [[1, 2]]}, + ]) ], - response.get('Set-Cookie')) - - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) + [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) self.assertEqual([ - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, - 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, - 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/2/2/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]}, + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])), + 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], - [i for i in packet]) - - def __test_pull_ReusePullSeqFromCookies(self): - master = MasterCommands('master') - request = Request() - response = db.Response() - - master.volume['document'].create(guid='1') - - request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ - base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'foo': [[2, 2]]})) - - reply = master.pull(request, response) - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'foo': [[2, 2]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) + response.get('Set-Cookie')) request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) + request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0] response = db.Response() - reply = master.pull(request, response) - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + reply = StringIO() + for chunk in self.master.pull(request, response): + reply.write(chunk) + reply.seek(0) + packets_iter = sync.decode(gzip.GzipFile(mode='r', fileobj=reply)) + with next(packets_iter) as packet: + self.assertEqual('files_diff', packet.name) + records_iter = iter(packet) + self.assertEqual('file1', next(records_iter)['blob'].read()) + self.assertEqual('file2', next(records_iter)['blob'].read()) + self.assertEqual({'op': 'commit', 'sequence': [[1, 4]]}, next(records_iter)) + self.assertRaises(StopIteration, records_iter.next) + self.assertRaises(StopIteration, packets_iter.next) self.assertEqual([ 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', ], response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - self.assertEqual([ - {'cookie': {}, 'filename': 'master-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, - 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-1.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 1]]}, - ], - [i for i in packet]) - - def __test_pull_AskForNotYetReadyPull(self): - master = MasterCommands('master') - - def diff(*args, **kwargs): - for i in range(1024): - yield str(i), i, {} - coroutine.sleep(.1) - - self.override(Directory, 'diff', diff) + def test_pull_EmptyPackets(self): + self.master._pulls = { + 'pull': lambda layer, seq, out_seq=None: \ + ('diff', None, [{'layer': layer, 'seq': seq}]), + } request = Request() + request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]])])) response = db.Response() - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) + self.assertEqual(None, self.master.pull(request, response)) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]])])), 'sugar_network_delay=30; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - - coroutine.sleep(1) + coroutine.sleep(.5) + self.assertEqual(1, len([i for i in glob('tmp/pulls/*.tag')])) request = Request() + request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0] response = db.Response() - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) + self.assertEqual(None, self.master.pull(request, response)) self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', + 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', + 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', ], response.get('Set-Cookie')) + self.assertEqual(0, len([i for i in glob('tmp/pulls/*.tag')])) - def __test_clone(self): - master = MasterCommands('master') - request = Request() - response = db.Response() + def test_pull_FullClone(self): - master.volume['document'].create(guid='1') - master.volume['document'].create(guid='2') + def diff(layer, seq, out_seq): + out_seq.include(1, 10) + yield {'layer': layer, 'seq': seq} - reply = master.pull(request, response) - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) + self.master._pulls = { + 'pull': lambda layer, seq, out_seq: ('diff', None, diff(layer, seq, out_seq)), + 'files_pull': lambda layer, seq, out_seq: ('files_diff', None, diff(layer, seq, out_seq)), + } request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) response = db.Response() - reply = master.pull(request, response) - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + self.assertEqual(None, self.master.pull(request, response)) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])), + 'sugar_network_delay=30; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) + coroutine.sleep(.5) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - self.assertEqual([ - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, - 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, - 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, - 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, - 'prop': {'value': '', 'mtime': os.stat('db/document/2/2/prop').st_mtime}, - }}, - {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]}, - ], - [i for i in packet]) - - def __test_pull_ProcessFilePulls(self): - node.sync_dirs.value = ['files'] - seqno = util.Seqno('seqno') - master = MasterCommands('master') request = Request() + request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0] response = db.Response() - - self.touch(('files/1', '1')) - self.touch(('files/2', '2')) - self.touch(('files/3', '3')) - self.utime('files', 1) - - reply = master.pull(request, response, files='[[1, null]]') - assert reply is None - self.assertEqual(None, response.content_type) - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'files': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - coroutine.sleep(1) - - request = Request() - request.environ['HTTP_COOKIE'] = ';'.join(cookie) - response = db.Response() - reply = master.pull(request, response) - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + reply = StringIO() + for chunk in self.master.pull(request, response): + reply.write(chunk) + reply.seek(0) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + ({'packet': 'diff'}, [{'layer': None, 'seq': [[1, None]]}]), ], - response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('master', packet.header['src']) - self.assertEqual(None, packet.header.get('dst')) - - response = db.Response() - reply = master.pull(request, response, files='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) self.assertEqual([ - 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', - 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \ + base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])), + 'sugar_network_delay=0; Max-Age=3600; HttpOnly', ], response.get('Set-Cookie')) - self.assertEqual( - sorted([ - {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'}, - {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '2', 'content_type': 'blob', 'path': '2'}, - {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '3'}, - {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 3]]}, - ]), - read_records(reply)) - - def __test_ReuseCachedPulls(self): - master = MasterCommands('master') - - cached_pull = join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull') - with OutPacket(stream=file(cached_pull, 'w'), probe='test', cookie={}) as packet: - packet.push(data=[None]) request = Request() + request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0] response = db.Response() - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) + reply = StringIO() + for chunk in self.master.pull(request, response): + reply.write(chunk) + reply.seek(0) + self.assertEqual([ + ({'packet': 'files_diff'}, [{'layer': None, 'seq': [[1, None]]}]), + ], + [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))]) self.assertEqual([ 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', ], response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('test', packet.header['probe']) - for i in master._pull_queue.values(): - i.unlink() - master._pull_queue.clear() - - cached_pull = join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull') - with OutPacket(stream=file(cached_pull, 'w'), probe='test', cookie={'sn_pull': [[2, None]]}) as packet: - packet.push(data=[None]) - - request = Request() - response = db.Response() - reply = master.pull(request, response, sn_pull='[[1, null]]') - assert reply is not None - self.assertEqual('application/x-tar', response.content_type) - self.assertEqual([ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})), - 'sugar_network_delay=0; Max-Age=3600; HttpOnly', - ], - response.get('Set-Cookie')) - packet = InPacket(stream=reply) - self.assertEqual('test', packet.header['probe']) + def __test_pull_LimittedPull(self): + pass - def __test_UnlinkCachedPullsOnEjectionFromQueue(self): - sync_master._PULL_QUEUE_SIZE = 1 - master = MasterCommands('master') + def __test_pull_ReusePullSeqFromCookies(self): + pass - master.volume['document'].create(guid='1') - master.volume['document'].create(guid='2') + def __test_pull_AskForNotYetReadyPull(self): + pass - response = db.Response() - reply = master.pull(Request(), response, sn_pull='[[1, null]]') - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - assert exists(join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull')) + def __test_pull_ProcessFilePulls(self): + pass - response = db.Response() - reply = master.pull(Request(), response, sn_pull='[[2, null]]') - cookie = [ - 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})), - 'sugar_network_delay=30; Max-Age=3600; HttpOnly', - ] - self.assertEqual(cookie, response.get('Set-Cookie')) - assert not exists(join('tmp', pull_hash({'sn_push': [[1, None]]}) + '.pull')) - assert exists(join('tmp', pull_hash({'sn_pull': [[2, None]]}) + '.pull')) + def __test_ReuseCachedPulls(self): + pass def pull_hash(seq): @@ -812,6 +564,7 @@ class Request(object): def __init__(self): self.content_stream = StringIO() + self.environ = {} if __name__ == '__main__': |