Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-03-01 15:29:59 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-03-01 15:29:59 (GMT)
commita6e32740d3a374a0f07f0bebabaa50395480f0a0 (patch)
tree31341280866c910a67427c27d7e66823f2df27cd
parentfa1586847d81cc71eec09cce5d98fa79800cd6fc (diff)
Fix sync on master side
-rwxr-xr-xsugar-network-client9
-rwxr-xr-xsugar-network-node6
-rw-r--r--sugar_network/node/__init__.py5
-rw-r--r--sugar_network/node/downloads.py128
-rw-r--r--sugar_network/node/master.py168
-rw-r--r--sugar_network/node/slave.py9
-rw-r--r--sugar_network/node/sync.py42
-rw-r--r--sugar_network/toolkit/__init__.py9
-rw-r--r--sugar_network/toolkit/util.py6
-rw-r--r--tests/__init__.py5
-rw-r--r--tests/units/node/__main__.py1
-rwxr-xr-xtests/units/node/downloads.py119
-rwxr-xr-xtests/units/node/sync.py242
-rwxr-xr-xtests/units/node/sync_master.py879
14 files changed, 904 insertions, 724 deletions
diff --git a/sugar-network-client b/sugar-network-client
index 5d7c652..c2f4a81 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -52,8 +52,8 @@ class Application(application.Daemon):
application.logdir.value = join(client.local_root.value, 'log')
else:
application.logdir.value = sugar.profile_path('logs')
- if not exists(toolkit.tmpdir.value):
- os.makedirs(toolkit.tmpdir.value)
+ if not exists(toolkit.cachedir.value):
+ os.makedirs(toolkit.cachedir.value)
application.rundir.value = join(client.local_root.value, 'run')
coroutine.signal(signal.SIGCHLD, self.__SIGCHLD_cb)
@@ -196,12 +196,13 @@ application.debug.value = sugar.logger_level()
node.trust_users.value = True
# If tmpfs is mounted to /tmp, `os.fstat()` will return 0 free space
# and will brake offline synchronization logic
-toolkit.tmpdir.value = sugar.profile_path('tmp')
+toolkit.cachedir.value = sugar.profile_path('tmp')
Option.seek('main', application)
+Option.seek('main', [toolkit.cachedir])
Option.seek('webui', webui)
Option.seek('client', client)
-Option.seek('client', [sugar.keyfile, toolkit.tmpdir])
+Option.seek('client', [sugar.keyfile])
Option.seek('node', [node.port, node.files_root])
Option.seek('stats', stats_user)
Option.seek('db', db)
diff --git a/sugar-network-node b/sugar-network-node
index f1a58ee..4cceb30 100755
--- a/sugar-network-node
+++ b/sugar-network-node
@@ -41,8 +41,8 @@ class Application(application.Daemon):
jobs = coroutine.Pool()
def ensure_run(self):
- if toolkit.tmpdir.value and not exists(toolkit.tmpdir.value):
- os.makedirs(toolkit.tmpdir.value)
+ if toolkit.cachedir.value and not exists(toolkit.cachedir.value):
+ os.makedirs(toolkit.cachedir.value)
if not exists(node.data_root.value):
os.makedirs(node.data_root.value)
enforce(os.access(node.data_root.value, os.W_OK),
@@ -121,10 +121,10 @@ monkey.patch_time()
locale.setlocale(locale.LC_ALL, '')
Option.seek('main', application)
+Option.seek('main', [toolkit.cachedir])
Option.seek('webui', webui)
Option.seek('client', [client.api_url])
Option.seek('node', node)
-Option.seek('node', [toolkit.tmpdir])
Option.seek('node-stats', stats_node)
Option.seek('user-stats', stats_user)
Option.seek('obs', obs)
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index 08c8eea..f0d51c8 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -58,3 +58,8 @@ stats_root = Option(
files_root = Option(
'path to a directory to keep files synchronized between nodes',
default='/var/lib/sugar-network/files', name='files_root')
+
+pull_timeout = Option(
+ 'delay in seconds to return to sync-pull requester to wait until '
+ 'pull request will be ready',
+ default=30, type_cast=int)
diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py
new file mode 100644
index 0000000..b2bd667
--- /dev/null
+++ b/sugar_network/node/downloads.py
@@ -0,0 +1,128 @@
+# Copyright (C) 2013 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Persistent pool with temporary files prepared to download."""
+
+import os
+import json
+import hashlib
+import logging
+from glob import glob
+from os.path import join, splitext, exists
+
+from sugar_network.toolkit import pylru, coroutine, exception
+
+
+# Maximum numer of postponed pulls master can handle at the same time
+_POOL_SIZE = 256
+_TAG_SUFFIX = '.tag'
+
+_logger = logging.getLogger('node.downloads')
+
+
+class Pool(object):
+
+ def __init__(self, root):
+ self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop())
+ if not exists(root):
+ os.makedirs(root)
+ self._root = root
+
+ for tag_path in glob(join(root, '*.tag')):
+ path, __ = splitext(tag_path)
+ if exists(path):
+ try:
+ with file(tag_path) as f:
+ key, tag = json.load(f)
+ pool_key = json.dumps(key)
+ self._pool[pool_key] = _Download(key, tag, path)
+ continue
+ except Exception:
+ exception('Cannot open %r download, recreate', tag_path)
+ os.unlink(path)
+ os.unlink(tag_path)
+
+ def get(self, key):
+ key = json.dumps(key)
+ if key in self._pool:
+ return self._pool[key]
+
+ def set(self, key, tag, fetcher, *args, **kwargs):
+ pool_key = json.dumps(key)
+ path = join(self._root, hashlib.sha1(pool_key).hexdigest())
+
+ def do_fetch():
+ try:
+ complete = fetcher(*args, path=path, **kwargs)
+ except Exception:
+ exception('Error while fetching %r', self)
+ if exists(path):
+ os.unlink(path)
+ return True
+ with file(path + _TAG_SUFFIX, 'w') as f:
+ json.dump([key, tag], f)
+ return complete
+
+ job = coroutine.spawn(do_fetch)
+ dl = self._pool[pool_key] = _Download(key, tag, path, job)
+ return dl
+
+ def remove(self, key):
+ key = json.dumps(key)
+ if key in self._pool:
+ self._pool.peek(key).pop()
+ del self._pool[key]
+
+
+class _Download(dict):
+
+ def __init__(self, key, tag, path, job=None):
+ self.tag = tag
+ self._key = key
+ self._path = path
+ self._job = job
+
+ def __repr__(self):
+ return '<Download %r path=%r>' % (self._key, self._path)
+
+ @property
+ def ready(self):
+ # pylint: disable-msg=E1101
+ return self._job is None or self._job.dead
+
+ @property
+ def complete(self):
+ return self._job is not None and self._job.value
+
+ @property
+ def length(self):
+ if exists(self._path):
+ return os.stat(self._path).st_size
+
+ def open(self):
+ if exists(self._path):
+ return file(self._path, 'rb')
+
+ def pop(self):
+ if self._job is not None:
+ _logger.debug('Abort fetching %r', self)
+ self._job.kill()
+
+ if exists(self._path):
+ os.unlink(self._path)
+ if exists(self._path + _TAG_SUFFIX):
+ os.unlink(self._path + _TAG_SUFFIX)
+
+ _logger.debug('Throw out %r from the pool', self)
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index b6a387b..dd7ff22 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -13,14 +13,17 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import json
+import base64
import logging
from urlparse import urlsplit
+from Cookie import SimpleCookie
from os.path import join
-from sugar_network import db, client
-from sugar_network.node import sync, stats_user, files_root, files, volume
+from sugar_network import db, client, node
+from sugar_network.node import sync, stats_user, files, volume, downloads
from sugar_network.node.commands import NodeCommands
-from sugar_network.toolkit import util
+from sugar_network.toolkit import cachedir, util, enforce
_logger = logging.getLogger('node.master')
@@ -31,29 +34,107 @@ class MasterCommands(NodeCommands):
def __init__(self, volume_):
guid = urlsplit(client.api_url.value).netloc
NodeCommands.__init__(self, True, guid, volume_)
+
+ self._pulls = {
+ 'pull': lambda layer, seq, out_seq=None:
+ ('diff', None, volume.diff(self.volume, seq, out_seq, layer)),
+ 'files_pull': lambda layer, seq, out_seq=None:
+ ('files_diff', None, self._files.diff(seq, out_seq)),
+ }
+
+ self._pull_queue = downloads.Pool(join(cachedir.value, 'pulls'))
self._files = None
- if files_root.value:
- self._files = files.Index(files_root.value,
+ if node.files_root.value:
+ self._files = files.Index(node.files_root.value,
join(volume_.root, 'files.index'), volume_.seqno)
@db.volume_command(method='POST', cmd='sync',
permissions=db.ACCESS_AUTH)
def sync(self, request):
+ reply, cookie = self._push(request)
+ for op, layer, seq in cookie:
+ reply.append(self._pulls[op](layer, seq))
+ return sync.encode(src=self.guid, *reply)
+
+ @db.volume_command(method='POST', cmd='push')
+ def push(self, request, response):
+ reply, cookie = self._push(request)
+ # Read passed cookie only after excluding `merged_seq`.
+ # If there is `pull` out of currently pushed packet, excluding
+ # `merged_seq` should not affect it.
+ cookie.update(_Cookie(request))
+ cookie.store(response)
+ return sync.encode(src=self.guid, *reply)
+
+ @db.volume_command(method='GET', cmd='pull',
+ mime_type='application/octet-stream',
+ arguments={'accept_length': db.to_int})
+ def pull(self, request, response, accept_length=None):
+ cookie = _Cookie(request)
+ if not cookie:
+ _logger.warning('Requested full dump in pull command')
+ cookie.append(('pull', None, [[1, None]]))
+ cookie.append(('files_pull', None, [[1, None]]))
+
+ reply = None
+ for pull_key in cookie:
+ op, layer, seq = pull_key
+
+ pull = self._pull_queue.get(pull_key)
+ if pull is not None:
+ if not pull.ready:
+ continue
+ if not pull.tag:
+ self._pull_queue.remove(pull_key)
+ cookie.remove(pull_key)
+ continue
+ if accept_length is None or pull.length <= accept_length:
+ _logger.debug('Found ready to use %r', pull)
+ if pull.complete:
+ cookie.remove(pull_key)
+ else:
+ seq.exclude(pull.tag)
+ reply = pull.open()
+ break
+ _logger.debug('Existing %r is too big, will recreate', pull)
+ self._pull_queue.remove(pull_key)
+
+ out_seq = util.Sequence()
+ pull = self._pull_queue.set(pull_key, out_seq,
+ sync.sneakernet_encode,
+ [self._pulls[op](layer, seq, out_seq)],
+ limit=accept_length, src=self.guid)
+ _logger.debug('Start new %r', pull)
+
+ if reply is None:
+ if cookie:
+ _logger.debug('No ready pulls')
+ # TODO Might be useful to set meaningful value here
+ cookie.delay = node.pull_timeout.value
+ else:
+ _logger.debug('Nothing to pull')
+
+ cookie.store(response)
+ return reply
+
+ def _push(self, request):
reply = []
- merged_seq = util.Sequence([])
+ cookie = _Cookie()
pull_seq = None
+ merged_seq = util.Sequence([])
for packet in sync.decode(request.content_stream):
+ src = packet['src']
+ enforce(packet['dst'] == self.guid, 'Misaddressed packet')
+
if packet.name == 'pull':
- pull_seq = util.Sequence(packet['sequence'])
- reply.append(('diff', {}, volume.diff(self.volume, pull_seq)))
+ pull_seq = cookie['pull', packet['layer']]
+ pull_seq.include(packet['sequence'])
elif packet.name == 'files_pull':
if self._files is not None:
- seq = util.Sequence(packet['sequence'])
- reply.append(('files_diff', None, self._files.diff(seq)))
+ cookie['files_pull'].include(packet['sequence'])
elif packet.name == 'diff':
- src = packet['src']
seq, ack_seq = volume.merge(self.volume, packet)
reply.append(('ack', {
'ack': ack_seq,
@@ -62,12 +143,65 @@ class MasterCommands(NodeCommands):
}, None))
merged_seq.include(ack_seq)
elif packet.name == 'stats_diff':
- src = packet['src']
- seq = stats_user.merge(packet)
- reply.append(('stats_ack',
- {'sequence': seq, 'dst': src}, None))
+ reply.append(('stats_ack', {
+ 'sequence': stats_user.merge(packet),
+ 'dst': src,
+ }, None))
- if pull_seq:
+ if pull_seq is not None:
pull_seq.exclude(merged_seq)
- return sync.encode(src=self.guid, *reply)
+ return reply, cookie
+
+
+class _Cookie(list):
+
+ def __init__(self, request=None):
+ list.__init__(self)
+ if request is not None:
+ self.update(self._get_cookie(request, 'sugar_network_sync') or [])
+ self.delay = 0
+
+ def update(self, that):
+ for op, layer, seq in that:
+ self[op, layer].include(seq)
+
+ def store(self, response):
+ if self:
+ _logger.debug('Postpone %r in cookie', self)
+ to_store = base64.b64encode(json.dumps(self))
+ self._set_cookie(response, 'sugar_network_sync', to_store)
+ self._set_cookie(response, 'sugar_network_delay', self.delay)
+ else:
+ self._unset_cookie(response, 'sugar_network_sync')
+ self._unset_cookie(response, 'sugar_network_delay')
+
+ def __getitem__(self, key):
+ if not isinstance(key, tuple):
+ key = (key, None)
+ for op, layer, seq in self:
+ if (op, layer) == key:
+ return seq
+ seq = util.Sequence()
+ self.append(key + (seq,))
+ return seq
+
+ def _get_cookie(self, request, name):
+ cookie_str = request.environ.get('HTTP_COOKIE')
+ if not cookie_str:
+ return
+ cookie = SimpleCookie()
+ cookie.load(cookie_str)
+ if name not in cookie:
+ return
+ value = cookie.get(name).value
+ if value != 'unset_%s' % name:
+ return json.loads(base64.b64decode(value))
+
+ def _set_cookie(self, response, name, value, age=3600):
+ response.setdefault('Set-Cookie', [])
+ cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age)
+ response['Set-Cookie'].append(cookie)
+
+ def _unset_cookie(self, response, name):
+ self._set_cookie(response, name, 'unset_%s' % name, 0)
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 0f6f5a4..edd6a3f 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -47,15 +47,16 @@ class SlaveCommands(NodeCommands):
@db.volume_command(method='POST', cmd='online_sync',
permissions=db.ACCESS_LOCAL)
def online_sync(self):
- push = [('diff', {'src': self.guid},
- volume.diff(self.volume, self._push_seq)),
+ push = [('diff', None, volume.diff(self.volume, self._push_seq)),
('pull', {'sequence': self._pull_seq}, None),
('files_pull', {'sequence': self._files_seq}, None),
]
if stats_user.stats_user.value:
- push.append(('stats_diff', {'src': self.guid}, stats_user.diff()))
+ push.append(('stats_diff', None, stats_user.diff()))
response = Client().request('POST',
- data=sync.chunked_encode(*push), params={'cmd': 'sync'},
+ data=sync.chunked_encode(*push,
+ src=self.guid, dst=self._master_guid),
+ params={'cmd': 'sync'},
headers={'Transfer-Encoding': 'chunked'})
self._import(sync.decode(response.raw), None)
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
index 010bb9b..8a925da 100644
--- a/sugar_network/node/sync.py
+++ b/sugar_network/node/sync.py
@@ -15,13 +15,13 @@
import os
import gzip
+import json
import logging
-import cPickle as pickle
from types import GeneratorType
from os.path import exists, join, dirname
from sugar_network import db
-from sugar_network.toolkit import BUFFER_SIZE, util, enforce
+from sugar_network.toolkit import BUFFER_SIZE, coroutine, util, enforce
# Leave at leat n bytes in fs whle calling `encode_to_file()`
@@ -48,8 +48,8 @@ def limited_encode(limit, *packets, **header):
return _encode(limit, packets, header, _EncodingStatus())
-def chunked_encode(*packets):
- return _ChunkedEncoder(encode(*packets))
+def chunked_encode(*packets, **header):
+ return _ChunkedEncoder(encode(*packets, **header))
def sneakernet_decode(root, node=None, session=None):
@@ -59,7 +59,7 @@ def sneakernet_decode(root, node=None, session=None):
continue
zipfile = gzip.open(join(root, filename), 'rb')
try:
- package_props = pickle.load(zipfile)
+ package_props = json.loads(zipfile.readline())
if node is not None and package_props.get('src') == node:
if package_props.get('session') == session:
@@ -95,7 +95,8 @@ def sneakernet_encode(packets, root=None, limit=None, path=None, **header):
with file(path, 'wb') as package:
zipfile = gzip.GzipFile(fileobj=package)
try:
- pickle.dump(header, zipfile)
+ json.dump(header, zipfile)
+ zipfile.write('\n')
pos = None
encoder = _encode(limit, packets, None, status)
@@ -104,6 +105,7 @@ def sneakernet_encode(packets, root=None, limit=None, path=None, **header):
chunk = encoder.send(pos)
zipfile.write(chunk)
pos = zipfile.fileobj.tell()
+ coroutine.dispatch()
except StopIteration:
break
@@ -135,7 +137,7 @@ def _encode(limit, packets, header, status):
if header:
props.update(header)
props['packet'] = packet
- pos = (yield pickle.dumps(props)) or 0
+ pos = (yield json.dumps(props) + '\n') or 0
if content is None:
continue
@@ -150,7 +152,7 @@ def _encode(limit, packets, header, status):
blob = record.pop('blob')
blob_size = record['blob_size'] = os.stat(blob).st_size
- dump = pickle.dumps(record)
+ dump = json.dumps(record) + '\n'
if not status.aborted and limit is not None and \
pos + len(dump) + blob_size > limit:
status.aborted = True
@@ -158,7 +160,7 @@ def _encode(limit, packets, header, status):
raise StopIteration()
record = content.throw(StopIteration())
continue
- pos = (yield pickle.dumps(record)) or 0
+ pos = (yield dump) or 0
if blob is not None:
with file(blob, 'rb') as f:
@@ -172,7 +174,7 @@ def _encode(limit, packets, header, status):
except StopIteration:
pass
- yield pickle.dumps({'packet': 'last'})
+ yield json.dumps({'packet': 'last'}) + '\n'
class _ChunkedEncoder(object):
@@ -235,12 +237,16 @@ class _PacketsIterator(object):
return self.props.get(key)
def __iter__(self):
+ blob = None
while True:
- try:
- record = pickle.load(self._stream)
- except EOFError:
+ if blob is not None and blob.size_to_read:
+ self._stream.seek(blob.size_to_read, 1)
+ blob = None
+ record = self._stream.readline()
+ if not record:
self._name = None
- raise
+ raise EOFError()
+ record = json.loads(record)
packet = record.get('packet')
if packet:
self._name = packet
@@ -249,7 +255,7 @@ class _PacketsIterator(object):
break
blob_size = record.get('blob_size')
if blob_size:
- record['blob'] = _Blob(self._stream, blob_size)
+ blob = record['blob'] = _Blob(self._stream, blob_size)
yield record
def __enter__(self):
@@ -263,9 +269,9 @@ class _Blob(object):
def __init__(self, stream, size):
self._stream = stream
- self._size_to_read = size
+ self.size_to_read = size
def read(self, size=BUFFER_SIZE):
- chunk = self._stream.read(min(size, self._size_to_read))
- self._size_to_read -= len(chunk)
+ chunk = self._stream.read(min(size, self.size_to_read))
+ self.size_to_read -= len(chunk)
return chunk
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index ffb0b8f..7673016 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -23,11 +23,10 @@ from sugar_network.toolkit.options import Option
BUFFER_SIZE = 1024 * 10
-tmpdir = Option(
- 'if specified, use this directory for temporary files; such files '
- 'might take considerable number of bytes while downloading of '
- 'synchronizing Sugar Network content',
- name='tmpdir')
+cachedir = Option(
+ 'path to a directory to keep cached files; such files '
+ 'might take considerable number of bytes',
+ default='/var/cache/sugar-network', name='cachedir')
def enforce(condition, error=None, *args):
diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py
index dcbe2db..727edba 100644
--- a/sugar_network/toolkit/util.py
+++ b/sugar_network/toolkit/util.py
@@ -24,7 +24,7 @@ import collections
from os.path import exists, join, islink, isdir, dirname, basename, abspath
from os.path import lexists, isfile
-from sugar_network.toolkit import tmpdir, enforce
+from sugar_network.toolkit import cachedir, enforce
_logger = logging.getLogger('toolkit.util')
@@ -306,8 +306,8 @@ def unique_filename(root, filename):
def NamedTemporaryFile(*args, **kwargs):
- if tmpdir.value:
- kwargs['dir'] = tmpdir.value
+ if cachedir.value:
+ kwargs['dir'] = cachedir.value
return tempfile.NamedTemporaryFile(*args, **kwargs)
diff --git a/tests/__init__.py b/tests/__init__.py
index 4e73db1..92c0fae 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -27,7 +27,7 @@ from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.resources.implementation import Implementation
from sugar_network.node.commands import NodeCommands
-from sugar_network.node import stats_user, stats_node, obs, auth, slave
+from sugar_network.node import stats_user, stats_node, obs, auth, slave, downloads
from sugar_network.resources.volume import Volume
@@ -104,11 +104,12 @@ class Test(unittest.TestCase):
obs._client = None
http._RECONNECTION_NUMBER = 0
auth.reset()
- toolkit.tmpdir.value = tmpdir + '/tmp'
+ toolkit.cachedir.value = tmpdir + '/tmp'
injector.invalidate_solutions(None)
injector._pms_path = None
journal._ds_root = tmpdir + '/datastore'
solver.nodeps = False
+ downloads._POOL_SIZE = 256
Volume.RESOURCES = [
'sugar_network.resources.user',
diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py
index 4a0d50c..684e1b7 100644
--- a/tests/units/node/__main__.py
+++ b/tests/units/node/__main__.py
@@ -3,6 +3,7 @@
from __init__ import tests
from auth import *
+from downloads import *
from files import *
from node import *
from obs import *
diff --git a/tests/units/node/downloads.py b/tests/units/node/downloads.py
new file mode 100755
index 0000000..29ea5d0
--- /dev/null
+++ b/tests/units/node/downloads.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import json
+from os.path import exists
+
+from __init__ import tests
+
+from sugar_network.node import downloads
+from sugar_network.toolkit import coroutine
+
+
+class DownloadsTest(tests.Test):
+
+ def test_populate(self):
+ self.touch(('1', '1'))
+ self.touch(('2', '2'))
+ self.touch(('2.tag', json.dumps((2, {'file': 2}))))
+ self.touch(('3.tag', json.dumps((3, {'file': 3}))))
+
+ pool = downloads.Pool('.')
+ self.assertEqual(None, pool.get(1))
+ self.assertEqual(2, pool.get(2).tag['file'])
+ self.assertEqual('2', pool.get(2).open().read())
+ self.assertEqual(None, pool.get(3))
+
+ def test_ComplexKeys(self):
+ key = {-1: None}
+
+ self.touch(('file', 'file'))
+ self.touch(('file.tag', json.dumps((key, {'file': 2}))))
+
+ pool = downloads.Pool('.')
+ self.assertEqual('file', pool.get(key).open().read())
+ key['foo'] = 'bar'
+ pool.set(key, None, lambda path: file(path, 'w').close())
+ self.assertNotEqual(None, pool.get(key))
+
+ def test_set_Tags(self):
+ tag = []
+
+ def fetch(path):
+ with file(path, 'w') as f:
+ f.write('payload')
+ tag.append(True)
+
+ pool = downloads.Pool('.')
+ dl = pool.set('key', tag, fetch)
+ self.assertEqual(False, dl.ready)
+ self.assertEqual(None, dl.open())
+ self.assertEqual([], tag)
+
+ coroutine.dispatch()
+ self.assertEqual(True, dl.ready)
+ self.assertEqual('payload', dl.open().read())
+ self.assertEqual([True], tag)
+
+ pool2 = downloads.Pool('.')
+ dl2 = pool2.get('key')
+ self.assertEqual(True, dl2.ready)
+ self.assertEqual('payload', dl2.open().read())
+ self.assertEqual([True], tag)
+
+ def test_Eject(self):
+ downloads._POOL_SIZE = 3
+ pool = downloads.Pool('.')
+
+ pool.set(1, None, lambda path: file(path, 'w').close())
+ coroutine.dispatch()
+ file1 = pool.get(1).open().name
+ pool.set(2, None, lambda path: file(path, 'w').close())
+ coroutine.dispatch()
+ file2 = pool.get(2).open().name
+ pool.set(3, None, lambda path: file(path, 'w').close())
+ coroutine.dispatch()
+ file3 = pool.get(3).open().name
+
+ assert pool.get(1) is not None
+ assert exists(file1)
+ assert exists(file1 + '.tag')
+ assert pool.get(2) is not None
+ assert exists(file2)
+ assert exists(file2 + '.tag')
+ assert pool.get(3) is not None
+ assert exists(file3)
+ assert exists(file3 + '.tag')
+
+ pool.set(4, None, lambda path: file(path, 'w').close())
+ pool.set(5, None, lambda path: file(path, 'w').close())
+ pool.set(6, None, lambda path: file(path, 'w').close())
+
+ assert pool.get(1) is None
+ assert not exists(file1)
+ assert not exists(file1 + '.tag')
+ assert pool.get(2) is None
+ assert not exists(file2)
+ assert not exists(file2 + '.tag')
+ assert pool.get(3) is None
+ assert not exists(file3)
+ assert not exists(file3 + '.tag')
+
+ def test_remove(self):
+ pool = downloads.Pool('.')
+
+ pool.set(1, None, lambda path: file(path, 'w').close())
+ coroutine.dispatch()
+ file1 = pool.get(1).open().name
+ assert pool.get(1) is not None
+ assert exists(file1)
+ assert exists(file1 + '.tag')
+
+ pool.remove(1)
+ assert pool.get(1) is None
+ assert not exists(file1)
+ assert not exists(file1 + '.tag')
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py
index 6e512f9..9cbb6a5 100755
--- a/tests/units/node/sync.py
+++ b/tests/units/node/sync.py
@@ -3,8 +3,8 @@
import os
import uuid
+import json
from StringIO import StringIO
-import cPickle as pickle
from os.path import exists
from __init__ import tests
@@ -17,13 +17,13 @@ class SyncTest(tests.Test):
def test_decode(self):
stream = StringIO()
- pickle.dump({'foo': 'bar'}, stream)
+ dump({'foo': 'bar'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
self.assertRaises(EOFError, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- pickle.dump({'packet': 1, 'bar': 'foo'}, stream)
+ dump({'packet': 1, 'bar': 'foo'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
with next(packets_iter) as packet:
@@ -34,7 +34,7 @@ class SyncTest(tests.Test):
self.assertRaises(EOFError, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- pickle.dump({'payload': 1}, stream)
+ dump({'payload': 1}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
with next(packets_iter) as packet:
@@ -45,8 +45,8 @@ class SyncTest(tests.Test):
self.assertRaises(EOFError, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- pickle.dump({'packet': 2}, stream)
- pickle.dump({'payload': 2}, stream)
+ dump({'packet': 2}, stream)
+ dump({'payload': 2}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
with next(packets_iter) as packet:
@@ -62,7 +62,7 @@ class SyncTest(tests.Test):
self.assertRaises(EOFError, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- pickle.dump({'packet': 'last'}, stream)
+ dump({'packet': 'last'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
with next(packets_iter) as packet:
@@ -84,13 +84,13 @@ class SyncTest(tests.Test):
self.assertEqual(len(stream.getvalue()), stream.tell())
stream = StringIO()
- pickle.dump({'foo': 'bar'}, stream)
+ dump({'foo': 'bar'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
self.assertRaises(EOFError, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
- pickle.dump({'packet': 'last'}, stream)
+ dump({'packet': 'last'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
self.assertRaises(StopIteration, packets_iter.next)
@@ -98,13 +98,13 @@ class SyncTest(tests.Test):
def test_decode_SkipPackets(self):
stream = StringIO()
- pickle.dump({'packet': 1}, stream)
- pickle.dump({'payload': 1}, stream)
- pickle.dump({'payload': 11}, stream)
- pickle.dump({'payload': 111}, stream)
- pickle.dump({'packet': 2}, stream)
- pickle.dump({'payload': 2}, stream)
- pickle.dump({'packet': 'last'}, stream)
+ dump({'packet': 1}, stream)
+ dump({'payload': 1}, stream)
+ dump({'payload': 11}, stream)
+ dump({'payload': 111}, stream)
+ dump({'packet': 2}, stream)
+ dump({'payload': 2}, stream)
+ dump({'packet': 'last'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
@@ -133,21 +133,21 @@ class SyncTest(tests.Test):
def test_encode(self):
self.assertEqual([
- pickle.dumps({'packet': 'last'}),
+ dumps({'packet': 'last'}),
],
[i for i in sync.encode()])
self.assertEqual([
- pickle.dumps({'packet': None, 'foo': 'bar'}),
- pickle.dumps({'packet': 'last'}),
+ dumps({'packet': None, 'foo': 'bar'}),
+ dumps({'packet': 'last'}),
],
[i for i in sync.encode((None, None, None), foo='bar')])
self.assertEqual([
- pickle.dumps({'packet': 1}),
- pickle.dumps({'packet': '2', 'n': 2}),
- pickle.dumps({'packet': '3', 'n': 3}),
- pickle.dumps({'packet': 'last'}),
+ dumps({'packet': 1}),
+ dumps({'packet': '2', 'n': 2}),
+ dumps({'packet': '3', 'n': 3}),
+ dumps({'packet': 'last'}),
],
[i for i in sync.encode(
(1, {}, None),
@@ -156,16 +156,16 @@ class SyncTest(tests.Test):
)])
self.assertEqual([
- pickle.dumps({'packet': 1}),
- pickle.dumps({1: 1}),
- pickle.dumps({'packet': 2}),
- pickle.dumps({2: 2}),
- pickle.dumps({2: 2}),
- pickle.dumps({'packet': 3}),
- pickle.dumps({3: 3}),
- pickle.dumps({3: 3}),
- pickle.dumps({3: 3}),
- pickle.dumps({'packet': 'last'}),
+ dumps({'packet': 1}),
+ dumps({1: 1}),
+ dumps({'packet': 2}),
+ dumps({2: 2}),
+ dumps({2: 2}),
+ dumps({'packet': 3}),
+ dumps({3: 3}),
+ dumps({3: 3}),
+ dumps({3: 3}),
+ dumps({'packet': 'last'}),
],
[i for i in sync.encode(
(1, None, [{1: 1}]),
@@ -174,8 +174,8 @@ class SyncTest(tests.Test):
)])
def test_limited_encode(self):
- header_size = len(pickle.dumps({'packet': 'first'}))
- record_size = len(pickle.dumps({'record': 0}))
+ header_size = len(dumps({'packet': 'first'}))
+ record_size = len(dumps({'record': 0}))
def content():
yield {'record': 1}
@@ -183,32 +183,32 @@ class SyncTest(tests.Test):
yield {'record': 3}
i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'record': 1}, pickle.loads(i.send(header_size)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + 1)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + 1)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + record_size * 2, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'record': 1}, pickle.loads(i.send(header_size)))
- self.assertEqual({'record': 2}, pickle.loads(i.send(header_size + record_size)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size * 2)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
+ self.assertEqual({'record': 2}, json.loads(i.send(header_size + record_size)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size * 2)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + record_size * 2, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'record': 1}, pickle.loads(i.send(header_size)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + record_size + 1)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size + 1)))
self.assertRaises(StopIteration, i.next)
def test_limited_encode_FinalRecords(self):
- header_size = len(pickle.dumps({'packet': 'first'}))
- record_size = len(pickle.dumps({'record': 0}))
+ header_size = len(dumps({'packet': 'first'}))
+ record_size = len(dumps({'record': 0}))
def content():
try:
@@ -221,24 +221,24 @@ class SyncTest(tests.Test):
yield {'record': 5}
i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'record': 4}, pickle.loads(i.send(header_size + 1)))
- self.assertEqual({'record': 5}, pickle.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'record': 4}, json.loads(i.send(header_size + 1)))
+ self.assertEqual({'record': 5}, json.loads(i.send(999999999)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + record_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'record': 1}, pickle.loads(i.send(header_size)))
- self.assertEqual({'record': 4}, pickle.loads(i.send(header_size + record_size * 2 - 1)))
- self.assertEqual({'record': 5}, pickle.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
+ self.assertEqual({'record': 4}, json.loads(i.send(header_size + record_size * 2 - 1)))
+ self.assertEqual({'record': 5}, json.loads(i.send(999999999)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
self.assertRaises(StopIteration, i.next)
def test_limited_encode_Blobs(self):
- header_size = len(pickle.dumps({'packet': 'first'}))
- blob_header_size = len(pickle.dumps({'blob_size': 100}))
- record_size = len(pickle.dumps({'record': 2}))
+ header_size = len(dumps({'packet': 'first'}))
+ blob_header_size = len(dumps({'blob_size': 100}))
+ record_size = len(dumps({'record': 2}))
self.touch(('blob', '*' * 100))
def content():
@@ -247,36 +247,36 @@ class SyncTest(tests.Test):
yield {'record': 3}
i = sync.limited_encode(header_size + blob_header_size + 99, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + blob_header_size + 100, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + blob_header_size + 100 + record_size - 1, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100)))
self.assertRaises(StopIteration, i.next)
i = sync.limited_encode(header_size + blob_header_size + 100 + record_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'record': 2}, pickle.loads(i.send(header_size + blob_header_size + 100)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(header_size + blob_header_size + 100 + record_size)))
+ self.assertEqual({'record': 2}, json.loads(i.send(header_size + blob_header_size + 100)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100 + record_size)))
self.assertRaises(StopIteration, i.next)
def test_limited_encode_FinalBlobs(self):
- header_size = len(pickle.dumps({'packet': 'first'}))
- blob_header_size = len(pickle.dumps({'blob_size': 100}))
- record_size = len(pickle.dumps({'record': 2}))
+ header_size = len(dumps({'packet': 'first'}))
+ blob_header_size = len(dumps({'blob_size': 100}))
+ record_size = len(dumps({'record': 2}))
self.touch(('blob', '*' * 100))
def content():
@@ -288,49 +288,49 @@ class SyncTest(tests.Test):
yield {'record': 3}
i = sync.limited_encode(header_size, ('first', None, content()), ('second', None, content()))
- self.assertEqual({'packet': 'first'}, pickle.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, pickle.loads(i.send(header_size)))
+ self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
+ self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
self.assertEqual('*' * 100, i.send(999999999))
- self.assertEqual({'record': 3}, pickle.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, pickle.loads(i.send(999999999)))
+ self.assertEqual({'record': 3}, json.loads(i.send(999999999)))
+ self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
self.assertRaises(StopIteration, i.next)
def test_chunked_encode(self):
output = sync.chunked_encode()
- self.assertEqual({'packet': 'last'}, pickle.loads(decode_chunked(output.read(100))))
+ self.assertEqual({'packet': 'last'}, json.loads(decode_chunked(output.read(100))))
data = [{'foo': 1}, {'bar': 2}]
- data_stream = pickle.dumps({'packet': 'packet'})
+ data_stream = dumps({'packet': 'packet'})
for record in data:
- data_stream += pickle.dumps(record)
- data_stream += pickle.dumps({'packet': 'last'})
+ data_stream += dumps(record)
+ data_stream += dumps({'packet': 'last'})
output = sync.chunked_encode(('packet', None, iter(data)))
- dump = StringIO()
+ pauload = StringIO()
while True:
chunk = output.read(1)
if not chunk:
break
- dump.write(chunk)
- self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+ pauload.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(pauload.getvalue()))
output = sync.chunked_encode(('packet', None, iter(data)))
- dump = StringIO()
+ pauload = StringIO()
while True:
chunk = output.read(2)
if not chunk:
break
- dump.write(chunk)
- self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+ pauload.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(pauload.getvalue()))
output = sync.chunked_encode(('packet', None, iter(data)))
- dump = StringIO()
+ pauload = StringIO()
while True:
chunk = output.read(1000)
if not chunk:
break
- dump.write(chunk)
- self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+ pauload.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(pauload.getvalue()))
def test_encode_Blobs(self):
self.touch(('1', 'a'))
@@ -338,15 +338,15 @@ class SyncTest(tests.Test):
self.touch(('3', 'ccc'))
self.assertEqual([
- pickle.dumps({'packet': 1}),
- pickle.dumps({'num': 1, 'blob_size': 1}),
+ dumps({'packet': 1}),
+ dumps({'num': 1, 'blob_size': 1}),
'a',
- pickle.dumps({'num': 2, 'blob_size': 2}),
+ dumps({'num': 2, 'blob_size': 2}),
'bb',
- pickle.dumps({'packet': 2}),
- pickle.dumps({'num': 3, 'blob_size': 3}),
+ dumps({'packet': 2}),
+ dumps({'num': 3, 'blob_size': 3}),
'ccc',
- pickle.dumps({'packet': 'last'}),
+ dumps({'packet': 'last'}),
],
[i for i in sync.encode(
(1, None, [{'num': 1, 'blob': '1'}, {'num': 2, 'blob': '2'}]),
@@ -355,15 +355,15 @@ class SyncTest(tests.Test):
def test_decode_Blobs(self):
stream = StringIO()
- pickle.dump({'packet': 1}, stream)
- pickle.dump({'num': 1, 'blob_size': 1}, stream)
+ dump({'packet': 1}, stream)
+ dump({'num': 1, 'blob_size': 1}, stream)
stream.write('a')
- pickle.dump({'num': 2, 'blob_size': 2}, stream)
+ dump({'num': 2, 'blob_size': 2}, stream)
stream.write('bb')
- pickle.dump({'packet': 2}, stream)
- pickle.dump({'num': 3, 'blob_size': 3}, stream)
+ dump({'packet': 2}, stream)
+ dump({'num': 3, 'blob_size': 3}, stream)
stream.write('ccc')
- pickle.dump({'packet': 'last'}, stream)
+ dump({'packet': 'last'}, stream)
stream.seek(0)
packets_iter = sync.decode(stream)
@@ -383,6 +383,29 @@ class SyncTest(tests.Test):
self.assertRaises(StopIteration, packets_iter.next)
self.assertEqual(len(stream.getvalue()), stream.tell())
+ def test_decode_SkipNotReadBlobs(self):
+ stream = StringIO()
+ dump({'packet': 1}, stream)
+ dump({'num': 1, 'blob_size': 1}, stream)
+ stream.write('a')
+ dump({'num': 2, 'blob_size': 2}, stream)
+ stream.write('bb')
+ dump({'packet': 2}, stream)
+ dump({'num': 3, 'blob_size': 3}, stream)
+ stream.write('ccc')
+ dump({'packet': 'last'}, stream)
+ stream.seek(0)
+
+ packets_iter = sync.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([1, 2], [i['num'] for i in packet])
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual([3], [i['num'] for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
def test_sneakernet_decode(self):
sync.sneakernet_encode([
('first', {'packet_prop': 1}, [
@@ -469,5 +492,14 @@ def decode_chunked(encdata):
return newdata
+def dump(value, stream):
+ stream.write(json.dumps(value))
+ stream.write('\n')
+
+
+def dumps(value):
+ return json.dumps(value) + '\n'
+
+
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/node/sync_master.py b/tests/units/node/sync_master.py
index 9d60e3b..ef09845 100755
--- a/tests/units/node/sync_master.py
+++ b/tests/units/node/sync_master.py
@@ -2,10 +2,12 @@
# sugar-lint: disable
import os
+import gzip
import time
import json
import base64
import hashlib
+from glob import glob
from os.path import join, exists
from StringIO import StringIO
@@ -20,9 +22,13 @@ from sugar_network.node.master import MasterCommands
from sugar_network.resources.volume import Volume
from sugar_network.toolkit.router import Router
from sugar_network.toolkit import coroutine, util
+from sugar_network.toolkit.rrd import Rrd
-CHUNK = 100000
+class statvfs(object):
+
+ f_bfree = None
+ f_frsize = 1
class SyncMasterTest(tests.Test):
@@ -32,6 +38,8 @@ class SyncMasterTest(tests.Test):
self.uuid = 0
self.override(db, 'uuid', self.next_uuid)
+ self.override(os, 'statvfs', lambda *args: statvfs())
+ statvfs.f_bfree = 999999999
class Document(db.Document):
@@ -39,6 +47,7 @@ class SyncMasterTest(tests.Test):
def prop(self, value):
return value
+ node.files_root.value = 'sync'
self.volume = Volume('master', [Document])
self.master = MasterCommands(self.volume)
@@ -60,7 +69,7 @@ class SyncMasterTest(tests.Test):
{'commit': [[1, 1]]},
]),
('pull', {'sequence': [[1, None]]}, None),
- ):
+ dst='localhost:8888'):
request.content_stream.write(chunk)
request.content_stream.seek(0)
@@ -87,7 +96,7 @@ class SyncMasterTest(tests.Test):
}},
{'commit': [[2, 2]]},
]),
- ):
+ dst='localhost:8888'):
request.content_stream.write(chunk)
request.content_stream.seek(0)
@@ -96,6 +105,7 @@ class SyncMasterTest(tests.Test):
response.write(chunk)
response.seek(0)
self.assertEqual([
+ ({'packet': 'ack', 'ack': [[2, 2]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []),
({'packet': 'diff', 'src': 'localhost:8888'}, [
{'document': 'document'},
{'guid': '1', 'diff': {
@@ -106,702 +116,444 @@ class SyncMasterTest(tests.Test):
}},
{'commit': [[1, 1]]},
]),
- ({'packet': 'ack', 'ack': [[2, 2]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []),
],
[(packet.props, [i for i in packet]) for packet in sync.decode(response)])
- def __test_push_MisaddressedPackets(self):
- master = MasterCommands('master')
- response = db.Response()
-
- packet = OutBufferPacket()
- request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- self.assertRaises(RuntimeError, master.push, request, response)
-
- packet = OutBufferPacket(src='node')
+ def test_MisaddressedPackets(self):
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- self.assertRaises(RuntimeError, master.push, request, response)
+ for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None)):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
+ self.assertRaises(RuntimeError, lambda: next(self.master.sync(request)))
- packet = OutBufferPacket(dst='master')
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- self.assertRaises(RuntimeError, master.push, request, response)
+ for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None), dst='fake'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
+ self.assertRaises(RuntimeError, lambda: next(self.master.sync(request)))
- packet = OutBufferPacket(src='node', dst='fake')
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- self.assertRaises(RuntimeError, master.push, request, response)
+ for chunk in sync.encode(('pull', {'sequence': [[1, None]]}, None), dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
+ next(self.master.sync(request))
- packet = OutBufferPacket(src='master', dst='master')
- request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- self.assertRaises(RuntimeError, master.push, request, response)
+ def test_push_WithoutCookies(self):
+ ts = int(time.time())
- packet = OutBufferPacket(src='node', dst='master')
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
- master.push(request, response)
+ for chunk in sync.encode(
+ ('diff', None, [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ 'prop': {'value': 'value', 'mtime': 1},
+ }},
+ {'commit': [[1, 1]]},
+ ]),
+ ('stats_diff', {'dst': 'localhost:8888'}, [
+ {'db': 'db', 'user': 'user'},
+ {'timestamp': ts, 'values': {'field': 1.0}},
+ {'commit': {'user': {'db': [[1, ts]]}}},
+ ]),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
- def __test_push_ProcessPushes(self):
- master = MasterCommands('master')
- request = Request()
response = db.Response()
-
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}},
- {'cmd': 'sn_push', 'guid': '2', 'diff': {'guid': {'value': '2', 'mtime': 1}}},
- {'cmd': 'sn_push', 'guid': '3', 'diff': {'guid': {'value': '3', 'mtime': 1}}},
- {'cmd': 'sn_commit', 'sequence': [[1, 3]]},
- ])
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
- reply = master.push(request, response)
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
+ self.assertEqual([
+ ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[1, 1]], 'dst': None}, []),
+ ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[1, ts]]}}, 'src': 'localhost:8888', 'dst': None}, []),
+ ],
+ [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
self.assertEqual([
'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
],
response.get('Set-Cookie'))
- self.assertEqual(
- ['1', '2', '3'],
- [i.guid for i in master.volume['document'].find()[0]])
-
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual('node', packet.header.get('dst'))
- self.assertEqual([
- {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 3]],'merged': [[1, 3]]},
- ],
- [i for i in packet])
-
- def __test_push_ProcessPulls(self):
- master = MasterCommands('master')
+ def test_push_WithCookies(self):
+ ts = int(time.time())
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'sn_pull', 'sequence': [[1, 1]]},
- {'cmd': 'sn_pull', 'sequence': [[3, 4]]},
- {'cmd': 'sn_pull', 'sequence': [[7, None]]},
- ])
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
+ for chunk in sync.encode(
+ ('pull', {'sequence': [[1, None]]}, None),
+ ('files_pull', {'sequence': [[1, None]]}, None),
+ ('diff', None, [
+ {'document': 'document'},
+ {'guid': '2', 'diff': {
+ 'guid': {'value': '2', 'mtime': 2},
+ 'ctime': {'value': 2, 'mtime': 2},
+ 'mtime': {'value': 2, 'mtime': 2},
+ 'prop': {'value': 'value', 'mtime': 2},
+ }},
+ {'commit': [[2, 2]]},
+ ]),
+ ('stats_diff', {'dst': 'localhost:8888'}, [
+ {'db': 'db', 'user': 'user'},
+ {'timestamp': ts + 1, 'values': {'field': 2.0}},
+ {'commit': {'user': {'db': [[2, ts]]}}},
+ ]),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
response = db.Response()
- reply = master.push(request, response)
- assert reply is None
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1], [3, 4], [7, None]]})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
+ ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[2, 2]], 'dst': None}, []),
+ ({'packet': 'stats_ack', 'sequence': {'user': {'db': [[2, ts]]}}, 'src': 'localhost:8888', 'dst': None}, []),
],
- response.get('Set-Cookie'))
-
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'files_pull', 'directory': 'dir1', 'sequence': [[1, 1]]},
- {'cmd': 'files_pull', 'directory': 'dir1', 'sequence': [[3, 4]]},
- {'cmd': 'files_pull', 'directory': 'dir2', 'sequence': [[7, None]]},
- ])
- request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
- response = db.Response()
- reply = master.push(request, response)
- assert reply is None
+ [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'dir1': [[1, 1], [3, 4]], 'dir2': [[7, None]]})),
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[2, None]]), ('files_pull', None, [[1, None]])])),
'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'sn_pull', 'sequence': [[1, 1]]},
- {'cmd': 'files_pull', 'directory': 'dir', 'sequence': [[1, 1]]},
- ])
+ def test_push_CollectCookies(self):
request = Request()
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])]))
+ for chunk in sync.encode(
+ ('pull', {'sequence': [[11, None]]}, None),
+ ('files_pull', {'sequence': [[11, None]]}, None),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
response = db.Response()
- reply = master.push(request, response)
- assert reply is None
- self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'dir': [[1, 1]]})),
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
+ self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
+ self.assertEqual([
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])])),
'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- def __test_push_TweakPullAccordingToPush(self):
- master = MasterCommands('master')
request = Request()
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[10, None]]), ('files_pull', None, [[10, None]])]))
+ for chunk in sync.encode(
+ ('pull', {'sequence': [[1, 5]]}, None),
+ ('files_pull', {'sequence': [[1, 5]]}, None),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
response = db.Response()
-
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}},
- {'cmd': 'sn_commit', 'sequence': [[1, 1]]},
- {'cmd': 'sn_pull', 'sequence': [[1, None]]},
- ])
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
- reply = master.push(request, response)
-
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual('node', packet.header.get('dst'))
- self.assertEqual([
- {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 1]], 'merged': [[1, 1]]},
- ],
- [i for i in packet])
- self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})),
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
+ self.assertEqual([], [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
+ self.assertEqual([
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, 5], [10, None]]), ('files_pull', None, [[1, 5], [10, None]])])),
'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- def __test_push_DoNotTweakPullAccordingToPushIfCookieWasPassed(self):
- master = MasterCommands('master')
- request = Request()
- response = db.Response()
+ def test_push_ExcludeAcksFromCookies(self):
+ ts = int(time.time())
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(document='document', data=[
- {'cmd': 'sn_push', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}},
- {'cmd': 'sn_commit', 'sequence': [[1, 1]]},
- {'cmd': 'sn_pull', 'sequence': [[1, None]]},
- ])
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
- request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
- base64.b64encode(json.dumps({'sn_pull': [[1, None]]}))
-
- reply = master.push(request, response)
+ request = Request()
+ for chunk in sync.encode(
+ ('pull', {'sequence': [[1, None]]}, None),
+ ('diff', None, [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ 'prop': {'value': 'value', 'mtime': 1},
+ }},
+ {'commit': [[10, 10]]},
+ ]),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual('node', packet.header.get('dst'))
+ response = db.Response()
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 1]], 'merged': [[1, 1]]},
+ ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []),
],
- [i for i in packet])
+ [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[2, None]])])),
'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- def __test_push_ProcessStatsPushes(self):
- master = MasterCommands('master')
- request = Request()
- response = db.Response()
-
- ts = int(time.time()) - 1000
- packet = OutBufferPacket(src='node', dst='master')
- packet.push(cmd='stats_push', user='user1', db='db1', sequence=[[1, ts + 2]], data=[
- {'timestamp': ts + 1, 'values': {'f': 1}},
- {'timestamp': ts + 2, 'values': {'f': 2}},
- ])
- packet.push(cmd='stats_push', user='user1', db='db2', sequence=[[2, ts + 3]], data=[
- {'timestamp': ts + 3, 'values': {'f': 3}},
- ])
- packet.push(cmd='stats_push', user='user2', db='db3', sequence=[[ts + 4, ts + 4]], data=[
- {'timestamp': ts + 4, 'values': {'f': 4}},
- ])
- request.content_stream = packet.pop()
- request.content_length = len(request.content_stream.getvalue())
-
- reply = master.push(request, response)
- self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('Set-Cookie'))
-
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual('node', packet.header.get('dst'))
- self.assertEqual([
- {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'stats_ack',
- 'sequence': {
- 'user1': {'db1': [[1, ts + 2]], 'db2': [[2, ts + 3]]},
- 'user2': {'db3': [[ts + 4, ts + 4]]},
- },
- }
- ],
- [i for i in packet])
-
- __, __, values = rrdtool.fetch('stats/user/us/user1/db1.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 2))
- self.assertEqual([(1,), (2,), (None,)], values)
-
- __, __, values = rrdtool.fetch('stats/user/us/user1/db2.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 3))
- self.assertEqual([(None,), (None,), (3,), (None,)], values)
+ def test_push_DoNotExcludeAcksFromExistingCookies(self):
+ ts = int(time.time())
- __, __, values = rrdtool.fetch('stats/user/us/user2/db3.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 4))
- self.assertEqual([(None,), (None,), (None,), (4,), (None,)], values)
-
- def __test_pull_ProcessPulls(self):
- master = MasterCommands('master')
request = Request()
- response = db.Response()
-
- master.volume['document'].create(guid='1')
- master.volume['document'].create(guid='2')
-
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]])]))
+ for chunk in sync.encode(
+ ('diff', None, [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ 'prop': {'value': 'value', 'mtime': 1},
+ }},
+ {'commit': [[10, 10]]},
+ ]),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
- request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
response = db.Response()
- reply = master.pull(request, response)
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
- self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
- ],
- response.get('Set-Cookie'))
-
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
- 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
- 'prop': {'value': '', 'mtime': os.stat('db/document/2/2/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]},
+ ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []),
],
- [i for i in packet])
-
- response = db.Response()
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
+ 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
- self.assertEqual(3, len([i for i in packet]))
- def __test_pull_AvoidEmptyPacketsOnPull(self):
- master = MasterCommands('master')
- request = Request()
- response = db.Response()
-
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
+ def test_push_ExcludeAcksFromCookiesUsingProperLayer(self):
+ ts = int(time.time())
request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]])]))
+ for chunk in sync.encode(
+ ('pull', {'sequence': [[1, None]], 'layer': 'hidden'}, None),
+ ('diff', None, [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'ctime': {'value': 1, 'mtime': 1},
+ 'mtime': {'value': 1, 'mtime': 1},
+ 'prop': {'value': 'value', 'mtime': 1},
+ }},
+ {'commit': [[10, 10]]},
+ ]),
+ dst='localhost:8888'):
+ request.content_stream.write(chunk)
+ request.content_stream.seek(0)
+
response = db.Response()
- reply = master.pull(request, response)
- assert reply is None
- self.assertEqual('application/x-tar', response.content_type)
+ reply = StringIO()
+ for chunk in self.master.push(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ ({'packet': 'ack', 'ack': [[1, 1]], 'src': 'localhost:8888', 'sequence': [[10, 10]], 'dst': None}, []),
],
- response.get('Set-Cookie'))
-
- def __test_pull_LimittedPull(self):
- master = MasterCommands('master')
-
- master.volume['document'].create(guid='1', prop='*' * CHUNK)
- master.volume['document'].create(guid='2', prop='*' * CHUNK)
-
- request = Request()
- response = db.Response()
- reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
-
- request = Request()
- response = db.Response()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
- reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ [(packet.props, [i for i in packet]) for packet in sync.decode(reply)])
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', 'hidden', [[2, None]]), ('pull', None, [[1, None]])])),
+ 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
- self.assertEqual([
- {'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
- 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime},
- }},
- {'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 1]]},
- ],
- [i for i in packet])
+ def test_pull(self):
+ self.volume['document'].create(guid='1', prop='1', ctime=1, mtime=1)
+ self.volume['document'].create(guid='2', prop='2', ctime=2, mtime=2)
+ self.utime('master', 0)
+ self.touch(('sync/1', 'file1'))
+ self.touch(('sync/2', 'file2'))
request = Request()
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])]))
response = db.Response()
- reply = master.pull(request, response, accept_length=CHUNK * 1.5, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ self.assertEqual(None, self.master.pull(request, response))
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})),
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])),
'sugar_network_delay=30; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
- self.assertEqual(2, len([i for i in packet]))
-
- for i in master._pull_queue.values():
- i.unlink()
- master._pull_queue.clear()
-
- request = Request()
- response = db.Response()
- reply = master.pull(request, response, accept_length=CHUNK * 2.5, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
+ coroutine.sleep(.5)
request = Request()
+ request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0]
response = db.Response()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
- reply = master.pull(request, response, accept_length=CHUNK * 2.5, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ reply = StringIO()
+ for chunk in self.master.pull(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ ({'packet': 'diff'}, [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'prop': {'value': '1', 'mtime': 0},
+ 'guid': {'value': '1', 'mtime': 0},
+ 'ctime': {'value': 1, 'mtime': 0},
+ 'mtime': {'value': 1, 'mtime': 0}},
+ },
+ {'guid': '2', 'diff': {
+ 'prop': {'value': '2', 'mtime': 0},
+ 'guid': {'value': '2', 'mtime': 0},
+ 'ctime': {'value': 2, 'mtime': 0},
+ 'mtime': {'value': 2, 'mtime': 0}},
+ },
+ {'commit': [[1, 2]]},
+ ])
],
- response.get('Set-Cookie'))
-
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
+ [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
self.assertEqual([
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
- 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
- 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/2/2/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]},
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])),
+ 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
- [i for i in packet])
-
- def __test_pull_ReusePullSeqFromCookies(self):
- master = MasterCommands('master')
- request = Request()
- response = db.Response()
-
- master.volume['document'].create(guid='1')
-
- request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
- base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'foo': [[2, 2]]}))
-
- reply = master.pull(request, response)
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, 1]], 'foo': [[2, 2]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
+ response.get('Set-Cookie'))
request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
+ request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0]
response = db.Response()
- reply = master.pull(request, response)
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ reply = StringIO()
+ for chunk in self.master.pull(request, response):
+ reply.write(chunk)
+ reply.seek(0)
+ packets_iter = sync.decode(gzip.GzipFile(mode='r', fileobj=reply))
+ with next(packets_iter) as packet:
+ self.assertEqual('files_diff', packet.name)
+ records_iter = iter(packet)
+ self.assertEqual('file1', next(records_iter)['blob'].read())
+ self.assertEqual('file2', next(records_iter)['blob'].read())
+ self.assertEqual({'op': 'commit', 'sequence': [[1, 4]]}, next(records_iter))
+ self.assertRaises(StopIteration, records_iter.next)
+ self.assertRaises(StopIteration, packets_iter.next)
self.assertEqual([
'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
],
response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
- self.assertEqual([
- {'cookie': {}, 'filename': 'master-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
- 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-1.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 1]]},
- ],
- [i for i in packet])
-
- def __test_pull_AskForNotYetReadyPull(self):
- master = MasterCommands('master')
-
- def diff(*args, **kwargs):
- for i in range(1024):
- yield str(i), i, {}
- coroutine.sleep(.1)
-
- self.override(Directory, 'diff', diff)
+ def test_pull_EmptyPackets(self):
+ self.master._pulls = {
+ 'pull': lambda layer, seq, out_seq=None: \
+ ('diff', None, [{'layer': layer, 'seq': seq}]),
+ }
request = Request()
+ request.environ['HTTP_COOKIE'] = 'sugar_network_sync=%s' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]])]))
response = db.Response()
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
+ self.assertEqual(None, self.master.pull(request, response))
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]])])),
'sugar_network_delay=30; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
-
- coroutine.sleep(1)
+ coroutine.sleep(.5)
+ self.assertEqual(1, len([i for i in glob('tmp/pulls/*.tag')]))
request = Request()
+ request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0]
response = db.Response()
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
+ self.assertEqual(None, self.master.pull(request, response))
self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
+ 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
+ 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
],
response.get('Set-Cookie'))
+ self.assertEqual(0, len([i for i in glob('tmp/pulls/*.tag')]))
- def __test_clone(self):
- master = MasterCommands('master')
- request = Request()
- response = db.Response()
+ def test_pull_FullClone(self):
- master.volume['document'].create(guid='1')
- master.volume['document'].create(guid='2')
+ def diff(layer, seq, out_seq):
+ out_seq.include(1, 10)
+ yield {'layer': layer, 'seq': seq}
- reply = master.pull(request, response)
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
+ self.master._pulls = {
+ 'pull': lambda layer, seq, out_seq: ('diff', None, diff(layer, seq, out_seq)),
+ 'files_pull': lambda layer, seq, out_seq: ('files_diff', None, diff(layer, seq, out_seq)),
+ }
request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
response = db.Response()
- reply = master.pull(request, response)
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ self.assertEqual(None, self.master.pull(request, response))
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('pull', None, [[1, None]]), ('files_pull', None, [[1, None]])])),
+ 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
+ coroutine.sleep(.5)
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
- self.assertEqual([
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': {
- 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime},
- 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': {
- 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime},
- 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime},
- 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime},
- 'prop': {'value': '', 'mtime': os.stat('db/document/2/2/prop').st_mtime},
- }},
- {'cookie': {}, 'filename': 'master-2.packet', 'cmd': 'sn_commit', 'src': 'master', 'sequence': [[1, 2]]},
- ],
- [i for i in packet])
-
- def __test_pull_ProcessFilePulls(self):
- node.sync_dirs.value = ['files']
- seqno = util.Seqno('seqno')
- master = MasterCommands('master')
request = Request()
+ request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0]
response = db.Response()
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
-
- reply = master.pull(request, response, files='[[1, null]]')
- assert reply is None
- self.assertEqual(None, response.content_type)
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'files': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- coroutine.sleep(1)
-
- request = Request()
- request.environ['HTTP_COOKIE'] = ';'.join(cookie)
- response = db.Response()
- reply = master.pull(request, response)
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ reply = StringIO()
+ for chunk in self.master.pull(request, response):
+ reply.write(chunk)
+ reply.seek(0)
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ ({'packet': 'diff'}, [{'layer': None, 'seq': [[1, None]]}]),
],
- response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('master', packet.header['src'])
- self.assertEqual(None, packet.header.get('dst'))
-
- response = db.Response()
- reply = master.pull(request, response, files='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
self.assertEqual([
- 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
- 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % \
+ base64.b64encode(json.dumps([('files_pull', None, [[1, None]])])),
+ 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
],
response.get('Set-Cookie'))
- self.assertEqual(
- sorted([
- {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'},
- {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '2', 'content_type': 'blob', 'path': '2'},
- {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '3'},
- {'filename': 'master-0.packet', 'src': 'master', 'cookie': {}, 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 3]]},
- ]),
- read_records(reply))
-
- def __test_ReuseCachedPulls(self):
- master = MasterCommands('master')
-
- cached_pull = join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull')
- with OutPacket(stream=file(cached_pull, 'w'), probe='test', cookie={}) as packet:
- packet.push(data=[None])
request = Request()
+ request.environ['HTTP_COOKIE'] = response.get('Set-Cookie')[0]
response = db.Response()
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
+ reply = StringIO()
+ for chunk in self.master.pull(request, response):
+ reply.write(chunk)
+ reply.seek(0)
+ self.assertEqual([
+ ({'packet': 'files_diff'}, [{'layer': None, 'seq': [[1, None]]}]),
+ ],
+ [(packet.props, [i for i in packet]) for packet in sync.decode(gzip.GzipFile(mode='r', fileobj=reply))])
self.assertEqual([
'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
],
response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('test', packet.header['probe'])
- for i in master._pull_queue.values():
- i.unlink()
- master._pull_queue.clear()
-
- cached_pull = join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull')
- with OutPacket(stream=file(cached_pull, 'w'), probe='test', cookie={'sn_pull': [[2, None]]}) as packet:
- packet.push(data=[None])
-
- request = Request()
- response = db.Response()
- reply = master.pull(request, response, sn_pull='[[1, null]]')
- assert reply is not None
- self.assertEqual('application/x-tar', response.content_type)
- self.assertEqual([
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})),
- 'sugar_network_delay=0; Max-Age=3600; HttpOnly',
- ],
- response.get('Set-Cookie'))
- packet = InPacket(stream=reply)
- self.assertEqual('test', packet.header['probe'])
+ def __test_pull_LimittedPull(self):
+ pass
- def __test_UnlinkCachedPullsOnEjectionFromQueue(self):
- sync_master._PULL_QUEUE_SIZE = 1
- master = MasterCommands('master')
+ def __test_pull_ReusePullSeqFromCookies(self):
+ pass
- master.volume['document'].create(guid='1')
- master.volume['document'].create(guid='2')
+ def __test_pull_AskForNotYetReadyPull(self):
+ pass
- response = db.Response()
- reply = master.pull(Request(), response, sn_pull='[[1, null]]')
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[1, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- assert exists(join('tmp', pull_hash({'sn_pull': [[1, None]]}) + '.pull'))
+ def __test_pull_ProcessFilePulls(self):
+ pass
- response = db.Response()
- reply = master.pull(Request(), response, sn_pull='[[2, null]]')
- cookie = [
- 'sugar_network_sync=%s; Max-Age=3600; HttpOnly' % base64.b64encode(json.dumps({'sn_pull': [[2, None]]})),
- 'sugar_network_delay=30; Max-Age=3600; HttpOnly',
- ]
- self.assertEqual(cookie, response.get('Set-Cookie'))
- assert not exists(join('tmp', pull_hash({'sn_push': [[1, None]]}) + '.pull'))
- assert exists(join('tmp', pull_hash({'sn_pull': [[2, None]]}) + '.pull'))
+ def __test_ReuseCachedPulls(self):
+ pass
def pull_hash(seq):
@@ -812,6 +564,7 @@ class Request(object):
def __init__(self):
self.content_stream = StringIO()
+ self.environ = {}
if __name__ == '__main__':