Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-06-12 11:06:32 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-06-12 11:06:32 (GMT)
commit960845fe3c1792a49ed6dde7db86b9516b9cc08c (patch)
tree0b36bb8fdd41a0bc701ac964db91a516d5be8ad8
parent28498535d615941fd2655b1b59fef9bc4a88fcfd (diff)
Move sneakernet code from adserver.merged
-rw-r--r--sugar_network_server/sneakernet.py274
-rw-r--r--tests/__init__.py3
-rw-r--r--tests/units/__main__.py1
-rwxr-xr-xtests/units/sneakernet.py467
4 files changed, 745 insertions, 0 deletions
diff --git a/sugar_network_server/sneakernet.py b/sugar_network_server/sneakernet.py
new file mode 100644
index 0000000..2b482e7
--- /dev/null
+++ b/sugar_network_server/sneakernet.py
@@ -0,0 +1,274 @@
+# Copyright (C) 2012, Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import json
+import gzip
+import logging
+from glob import glob
+from os.path import abspath, isfile, join
+from gettext import gettext as _
+
+from active_document import env
+
+
+#: Callback to execute while synchronizing server data when target directory
+#: is full; accepts (`path`) arguments
+next_volume_cb = None
+
+_HEADER_SIZE = 4096
+_RESERVED_SIZE = 1024 * 1024
+
+_logger = logging.getLogger('active_document.sneakernet')
+
+
+def sync_node(node, volume_path, merge_cb, diff):
+ _logger.info(_('Synchronize with %s directory'), volume_path)
+
+ for packet in _import(volume_path, merge_cb):
+ sender = packet.header.get('sender')
+ if sender == node:
+ _logger.debug('Remove existing %s packet', packet.path)
+ os.unlink(packet.path)
+ elif sender == 'master' and packet.header.get('to') == node:
+ for row in packet.read_rows(type='ack'):
+ merge_cb(packet.header, row)
+ _logger.debug('Remove loaded %s packet', packet.path)
+ os.unlink(packet.path)
+ else:
+ for row in packet.read_rows(type=['diff', 'request']):
+ if row['type'] == 'diff':
+ merge_cb(packet.header, row)
+ if sender != 'master':
+ del packet.syns[:]
+
+ return _export(volume_path, node, diff)
+
+
+def sync_master(volume_path, merge_cb, diff):
+ _logger.info(_('Synchronize with %s directory'), volume_path)
+
+ for packet in _import(volume_path, merge_cb):
+ if packet.header.get('sender') == 'master':
+ del packet.syns[:]
+ else:
+ for row in packet.read_rows(type=['diff', 'request']):
+ merge_cb(packet.header, row)
+ _logger.debug('Remove loaded %s packet', packet.path)
+ os.unlink(packet.path)
+
+ return _export(volume_path, 'master', diff)
+
+
+class _InPacket(object):
+
+ def __init__(self, path):
+ self.path = abspath(path)
+ self._zip = None
+ self._row = None
+ self.header = {}
+ self.syns = []
+
+ if not isfile(path):
+ return
+
+ self._zip = gzip.GzipFile(path)
+ header = self._read(size=_HEADER_SIZE, subject='Sugar Network Packet')
+ if header is None:
+ _logger.info(_('Skip not recognized input packet file, %s'), path)
+ self.close()
+ else:
+ _logger.info(_('Open input packet file, %s: %r'), path, header)
+ self.header = header
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ @property
+ def opened(self):
+ return self._zip is not None
+
+ def read_rows(self, **kwargs):
+ while True:
+ row = self._read(**kwargs)
+ if row is None:
+ break
+ elif row is False:
+ continue
+ yield row
+
+ def close(self):
+ if self._zip is not None:
+ self._zip.close()
+ self._zip = None
+
+ def _read(self, size=None, **kwargs):
+ try:
+ if self._row is None:
+ if not self.opened:
+ return
+ row = self._zip.readline(size)
+ if not row:
+ _logger.warning(_('EOF for packet file, %s'), self.path)
+ self.close()
+ return
+ row = dict(json.loads(row))
+ else:
+ row = self._row
+
+ if row.get('type') == 'syn':
+ self.syns.append(row)
+ self._row = None
+ return False
+
+ for key, value in kwargs.items():
+ if type(value) == list:
+ if row.get(key) in value:
+ continue
+ elif row.get(key) == value:
+ continue
+ self._row = row
+ return None
+
+ self._row = None
+ return row
+
+ except (IOError, ValueError, TypeError), error:
+ _logger.warning(_('Malformed input packet file %r: %s'),
+ self.path, error)
+ self.close()
+
+
+class _OutPacket(object):
+
+ def __init__(self, root, **kwargs):
+ self._root = abspath(root)
+ self.header = {'subject': 'Sugar Network Packet'}
+ for key, value in kwargs.items():
+ if value is not None:
+ self.header[key] = value
+ self._zip = None
+ self._couter = 0
+ self.path = None
+ self._packets = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def close(self):
+ if self._zip is not None:
+ self._zip.close()
+ self._zip = None
+ self._couter = 0
+
+ def write_row(self, **kwargs):
+ if self._zip is None or self._couter >= _RESERVED_SIZE:
+ self._next_volume()
+ if kwargs.get('type') == 'syn':
+ kwargs['packets'] = self._packets
+ self._write_row(kwargs)
+
+ def _next_volume(self):
+ next_guid = env.uuid()
+ if self._zip is not None:
+ self._write_row({'type': 'part', 'next': next_guid})
+ self.header['prev'] = self.header['guid']
+ self.close()
+
+ while True:
+ stat = os.statvfs(self._root)
+ if stat.f_bfree * stat.f_frsize >= _RESERVED_SIZE * 2:
+ break
+ # pylint: disable-msg=E1102
+ if next_volume_cb is None or not next_volume_cb(self._root):
+ raise IOError(_('No free disk space in %r') % self._root)
+ _logger.info(_('Switched volumes for %r'), self._root)
+
+ self._packets.append(next_guid)
+ self.header['guid'] = next_guid
+ self.path = join(self._root, '%s.packet.gz' % next_guid)
+
+ _logger.info(_('Open output packet file %r'), self.path)
+ self._zip = gzip.GzipFile(self.path, 'w')
+ self.write_row(**self.header)
+
+ def _write_row(self, kwargs):
+ data = json.dumps(kwargs)
+ self._zip.write(data)
+ self._zip.write('\n')
+ self._couter += len(data) + 1
+
+
+def _import(volume_path, merge_cb):
+ processed = set()
+ processed_guids = set()
+ syns = []
+
+ while True:
+ parts = set()
+
+ for packet_path in glob(join(volume_path, '*.packet.gz')):
+ if packet_path in processed:
+ continue
+ processed.add(packet_path)
+ with _InPacket(packet_path) as packet:
+ processed_guids.add(packet.header.get('guid'))
+ if 'prev' in packet.header:
+ parts.add(packet.header['prev'])
+ yield packet
+ for i in packet.syns:
+ syns.append((packet.header, i))
+ for row in packet.read_rows(type='part'):
+ parts.add(row['next'])
+
+ if parts and next_volume_cb is not None:
+ part_names = ', '.join(['%s.packet.gz' for i in parts])
+ # pylint: disable-msg=E1102
+ if next_volume_cb(volume_path,
+ _('Change %s volume to load %s packet(s)') % \
+ (volume_path, part_names)):
+ continue
+ break
+
+ for header, row in syns:
+ if not (set(row.get('packets', [])) - processed_guids):
+ merge_cb(header, row)
+
+
+def _export(volume_path, sender, diff):
+ packet = None
+ last_to = False
+ result = None
+
+ try:
+ for to, row in diff:
+ if to != last_to:
+ last_to = to
+ if packet is not None:
+ packet.close()
+ packet = _OutPacket(volume_path, sender=sender, to=to)
+ packet.write_row(**row)
+ finally:
+ if packet is not None:
+ packet.close()
+ result = packet.path
+
+ return result
diff --git a/tests/__init__.py b/tests/__init__.py
index e5cebab..c668b77 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -10,6 +10,7 @@ from os.path import dirname, join, exists, abspath
import active_document as ad
import restful_document as rd
from active_toolkit import coroutine
+from sugar_network_server import sneakernet
root = abspath(dirname(__file__))
tmproot = join(root, '.tmp')
@@ -41,6 +42,8 @@ class Test(unittest.TestCase):
ad.find_limit.value = 1024
ad.index_write_queue.value = 10
+ sneakernet.next_volume_cb = None
+
self._logfile = file(self.logfile + '.out', 'a')
sys.stdout = sys.stderr = self._logfile
diff --git a/tests/units/__main__.py b/tests/units/__main__.py
index 6b22e1e..346e743 100644
--- a/tests/units/__main__.py
+++ b/tests/units/__main__.py
@@ -3,6 +3,7 @@
from __init__ import tests
from sequence import *
+from sneakernet import *
from user import *
diff --git a/tests/units/sneakernet.py b/tests/units/sneakernet.py
new file mode 100755
index 0000000..7101ab4
--- /dev/null
+++ b/tests/units/sneakernet.py
@@ -0,0 +1,467 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import gzip
+import json
+from glob import glob
+from os.path import exists
+
+from __init__ import tests
+
+from sugar_network_server import sneakernet
+
+
+class SneakernetTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+ self.override(os, 'statvfs', lambda *args: statvfs())
+
+ def test_InPacket_WrongFile(self):
+ self.packet('test.gz', [])
+ packet = sneakernet._InPacket('test.gz')
+ assert not packet.opened
+
+ self.packet('test.gz', [{'subject': 'bar'}])
+ packet = sneakernet._InPacket('test.gz')
+ assert not packet.opened
+
+ self.packet('test.gz', [{'subject': 'Sugar Network Packet'}])
+ packet = sneakernet._InPacket('test.gz')
+ assert packet.opened
+ self.assertEqual(None, packet.header.get('sender'))
+ self.assertEqual(None, packet.header.get('receiver'))
+
+ self.packet('test.gz', [{'subject': 'Sugar Network Packet', 'sender': 'me', 'receiver': 'you'}])
+ packet = sneakernet._InPacket('test.gz')
+ assert packet.opened
+ self.assertEqual('me', packet.header.get('sender'))
+ self.assertEqual('you', packet.header.get('receiver'))
+
+ def test_InPacket(self):
+ self.packet('test.gz', [
+ {'subject': 'Sugar Network Packet'},
+ ])
+ packet = sneakernet._InPacket('test.gz')
+ self.assertEqual(
+ [],
+ [i for i in packet.read_rows(type='probe')])
+ assert not packet.opened
+
+ self.packet('test.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'probe', 'foo': 'bar'},
+ ])
+ packet = sneakernet._InPacket('test.gz')
+ self.assertEqual(
+ [{'type': 'probe', 'foo': 'bar'}],
+ [i for i in packet.read_rows(type='probe')])
+ assert not packet.opened
+
+ self.packet('test.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'probe', 'foo': 1},
+ {'type': 'probe', 'foo': 2},
+ {'type': 'stop'},
+ {'type': 'probe', 'foo': 3}
+ ])
+ packet = sneakernet._InPacket('test.gz')
+ self.assertEqual(
+ [{'type': 'probe', 'foo': 1}, {'type': 'probe', 'foo': 2}],
+ [i for i in packet.read_rows(type='probe')])
+ assert packet.opened
+ self.assertEqual(
+ [{'type': 'stop'}],
+ [i for i in packet.read_rows(type='stop')])
+ assert packet.opened
+ self.assertEqual(
+ [{'type': 'probe', 'foo': 3}],
+ [i for i in packet.read_rows(type='probe')])
+ assert not packet.opened
+
+ def test_OutPacket(self):
+ out_packet = sneakernet._OutPacket('.', sender='me')
+ out_packet.close()
+ assert out_packet.path is None
+
+ out_packet = sneakernet._OutPacket('.', sender='me')
+ out_packet.write_row(foo='bar')
+ out_packet.write_row(bar='foo')
+ out_packet.close()
+ assert exists(out_packet.path)
+ in_packet = sneakernet._InPacket(out_packet.path)
+ self.assertEqual('me', in_packet.header.get('sender'))
+ self.assertEqual(
+ [{'foo': 'bar'}, {'bar': 'foo'}],
+ [i for i in in_packet.read_rows()])
+
+ def test_OutPacket_DiskFull(self):
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 - 1
+ out_packet = sneakernet._OutPacket('.', sender='me')
+ self.assertRaises(IOError, out_packet.write_row, foo='bar')
+ out_packet.close()
+ assert out_packet.path is None
+
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2
+ out_packet = sneakernet._OutPacket('.', sender='me')
+ out_packet.write_row(foo='bar')
+ out_packet.close()
+ in_packet = sneakernet._InPacket(out_packet.path)
+ self.assertEqual('me', in_packet.header.get('sender'))
+ self.assertEqual(
+ [{'foo': 'bar'}],
+ [i for i in in_packet.read_rows()])
+
+ def test_OutPacket_SwitchVolumes(self):
+ switches = []
+
+ def next_volume_cb(path, *args):
+ switches.append(path)
+ if len(switches) == 3:
+ statvfs.f_bfree += 1
+ return True
+
+ sneakernet.next_volume_cb = next_volume_cb
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 - 1
+ out_packet = sneakernet._OutPacket('.', sender='me')
+ out_packet.write_row(foo='bar')
+ out_packet.close()
+
+ self.assertEqual(
+ [tests.tmpdir] * 3,
+ switches)
+ self.assertEqual(
+ [{'foo': 'bar'}],
+ [i for i in sneakernet._InPacket(out_packet.path).read_rows()])
+
+ def test_OutPacket_WriteToSeveralVolumes(self):
+ switches = []
+ out_packet = sneakernet._OutPacket('.', sender='me')
+
+ def next_volume_cb(path, *args):
+ switches.append(path)
+ os.rename(out_packet.path, '%s.gz' % len(switches))
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2
+ if len(switches) == 3:
+ raise '!!'
+ return True
+
+ sneakernet.next_volume_cb = next_volume_cb
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2
+ out_packet.write_row(write=1, data='*' * sneakernet._RESERVED_SIZE)
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE
+ out_packet.write_row(write=2, data='*' * sneakernet._RESERVED_SIZE)
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE
+ out_packet.write_row(write=3, data='*' * sneakernet._RESERVED_SIZE)
+ out_packet.close()
+
+ self.assertEqual(
+ [tests.tmpdir] * 2,
+ switches)
+
+ in_packet_1 = sneakernet._InPacket('1.gz')
+ in_packet_2 = sneakernet._InPacket('2.gz')
+ in_packet_3 = sneakernet._InPacket(out_packet.path)
+
+ assert in_packet_1.header['guid']
+ assert in_packet_1.header['guid'] != in_packet_2.header['guid']
+ assert in_packet_1.header['guid'] != in_packet_3.header['guid']
+ self.assertEqual('me', in_packet_1.header.get('sender'))
+ self.assertEqual(
+ [(1, None), (None, in_packet_2.header['guid'])],
+ [(i.get('write'), i.get('next')) for i in in_packet_1.read_rows()])
+ assert 'prev' not in in_packet_1.header
+
+ assert in_packet_2.header['guid']
+ assert in_packet_2.header['guid'] != in_packet_1.header['guid']
+ assert in_packet_2.header['guid'] != in_packet_3.header['guid']
+ self.assertEqual('me', in_packet_2.header.get('sender'))
+ self.assertEqual(
+ [(2, None), (None, in_packet_3.header['guid'])],
+ [(i.get('write'), i.get('next')) for i in in_packet_2.read_rows()])
+ self.assertEqual(in_packet_1.header['guid'], in_packet_2.header.get('prev'))
+
+ assert in_packet_3.header['guid']
+ assert in_packet_3.header['guid'] != in_packet_1.header['guid']
+ assert in_packet_3.header['guid'] != in_packet_2.header['guid']
+ self.assertEqual('me', in_packet_3.header.get('sender'))
+ self.assertEqual(
+ [(3, None)],
+ [(i.get('write'), i.get('next')) for i in in_packet_3.read_rows()])
+ self.assertEqual(in_packet_2.header['guid'], in_packet_3.header.get('prev'))
+
+ def test_node_import_SupportedTypes(self):
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'foo'},
+ {'type': 'diff'},
+ {'type': 'syn'},
+ {'type': 'ack'},
+ {'type': 'request'},
+ {'type': 'foo'},
+ ])
+ rows = []
+ sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), [])
+ self.assertEqual(
+ sorted([{'type': 'diff'}]),
+ sorted(rows))
+ assert exists('test.packet.gz')
+
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'master'},
+ {'type': 'diff'},
+ {'type': 'syn'},
+ {'type': 'ack'},
+ {'type': 'request'},
+ {'type': 'foo'},
+ ])
+ rows = []
+ sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), [])
+ self.assertEqual(
+ sorted([{'type': 'diff'}, {'type': 'syn'}]),
+ sorted(rows))
+ assert exists('test.packet.gz')
+
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'node'},
+ {'type': 'diff'},
+ {'type': 'syn'},
+ {'type': 'ack'},
+ {'type': 'request'},
+ {'type': 'foo'},
+ ])
+ rows = []
+ sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), [])
+ self.assertEqual(
+ sorted([]),
+ sorted(rows))
+ assert not exists('test.packet.gz')
+
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'master', 'to': 'node'},
+ {'type': 'ack'},
+ {'type': 'diff'},
+ {'type': 'syn'},
+ {'type': 'request'},
+ {'type': 'foo'},
+ ])
+ rows = []
+ sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), [])
+ self.assertEqual(
+ sorted([{'type': 'ack'}]),
+ sorted(rows))
+ assert not exists('test.packet.gz')
+
+ def test_master_import_SupportedTypes(self):
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff'},
+ {'type': 'syn'},
+ {'type': 'request'},
+ {'type': 'foo'},
+ ])
+
+ rows = []
+ sneakernet.sync_master('.', lambda h, x: rows.append(x), [])
+
+ self.assertEqual(
+ sorted([{'type': 'diff'}, {'type': 'syn'}, {'type': 'request'}]),
+ sorted(rows))
+
+ def test_master_import_SkipMaster(self):
+ self.packet('test.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'master'},
+ {'type': 'diff'},
+ ])
+
+ rows = []
+ sneakernet.sync_master('.', lambda h, x: rows.append(x), [])
+
+ self.assertEqual([], rows)
+
+ def test_master_export_CleanupImported(self):
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'master'},
+ {'type': 'diff'},
+ ])
+ self.packet('2.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'master'},
+ {'type': 'diff'},
+ ])
+
+ packet = sneakernet.sync_master('.', lambda h, x: None, [('node', {})])
+
+ assert not exists('1.packet.gz')
+ assert not exists('2.packet.gz')
+ assert exists(packet)
+
+ def test_node_export_CleanupExisting(self):
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'node-1'},
+ {'type': 'diff'},
+ ])
+ self.packet('2.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'sender': 'node-2'},
+ {'type': 'diff'},
+ ])
+
+ packet_1 = sneakernet.sync_node('node-1', '.', lambda h, row: None, [('master', {})])
+
+ assert not exists('1.packet.gz')
+ assert exists('2.packet.gz')
+ assert exists(packet_1)
+
+ def test_sync_node_Volumes(self):
+ volumes = []
+ def next_volume_cb(*args):
+ if not volumes:
+ volumes.append(True)
+ return True
+ sneakernet.next_volume_cb = next_volume_cb
+
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff', 'seqno': 1},
+ {'type': 'part', 'next': '2'},
+ ])
+ imported = []
+ sneakernet.sync_node('node-1', '.', lambda h, x: imported.append(x['seqno']), [])
+ self.assertEqual([1], imported)
+
+ def next_volume_cb(*args):
+ if not exists('2.packet.gz'):
+ self.packet('2.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'prev': '3'},
+ {'type': 'diff', 'seqno': 2},
+ ])
+ return True
+ if not exists('3.packet.gz'):
+ self.packet('3.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff', 'seqno': 3},
+ {'type': 'part', 'next': '1'},
+ ])
+ return True
+ sneakernet.next_volume_cb = next_volume_cb
+
+ imported = []
+ sneakernet.sync_node('node-1', '.', lambda h, x: imported.append(x['seqno']), [])
+ self.assertEqual([1, 2, 3], imported)
+
+ def test_sync_master_Volumes(self):
+ volumes = []
+ def next_volume_cb(*args):
+ if not volumes:
+ volumes.append(True)
+ return True
+ sneakernet.next_volume_cb = next_volume_cb
+
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff', 'seqno': 1},
+ {'type': 'part', 'next': '2'},
+ ])
+ imported = []
+ sneakernet.sync_master('.', lambda h, x: imported.append(x['seqno']), [])
+ self.assertEqual([1], imported)
+
+ volumes = []
+ def next_volume_cb(*args):
+ if len(volumes) == 0:
+ self.packet('2.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'prev': '3'},
+ {'type': 'diff', 'seqno': 2},
+ ])
+ volumes.append(True)
+ return True
+ if len(volumes) == 1:
+ self.packet('3.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff', 'seqno': 3},
+ {'type': 'part', 'next': '1'},
+ ])
+ volumes.append(True)
+ return True
+ sneakernet.next_volume_cb = next_volume_cb
+
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet'},
+ {'type': 'diff', 'seqno': 1},
+ {'type': 'part', 'next': '2'},
+ ])
+ imported = []
+ sneakernet.sync_master('.', lambda h, x: imported.append(x['seqno']), [])
+ self.assertEqual([1, 2, 3], imported)
+
+ def test_SYNOnlyForImportedPackets(self):
+ self.packet('1.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'guid': '1', 'sender': 'master'},
+ {'type': 'syn', 'packets': ['1'], 'mark': '1'},
+ {'type': 'syn', 'packets': ['1', 'foo'], 'mark': '2'},
+ {'type': 'part', 'next': '2'},
+ ])
+ self.packet('2.packet.gz', [
+ {'subject': 'Sugar Network Packet', 'guid': '2', 'prev': '1'},
+ {'type': 'syn', 'packets': ['1', '2'], 'mark': '3'},
+ {'type': 'syn', 'packets': ['foo', '2'], 'mark': '4'},
+ {'type': 'syn', 'packets': ['1'], 'mark': '5'},
+ ])
+
+ node_syns = []
+ sneakernet.sync_node('node', '.', lambda h, x: node_syns.append(x['mark']), [])
+ master_syns = []
+ sneakernet.sync_master('.', lambda h, x: master_syns.append(x['mark']), [])
+
+ self.assertEqual(sorted(['1']), sorted(node_syns))
+ self.assertEqual(sorted(['3', '5']), sorted(master_syns))
+
+ def test_SYNOnlyForExportedPackets(self):
+
+ def next_volume_cb(path, *args):
+ statvfs.f_bfree += 1
+ return True
+ sneakernet.next_volume_cb = next_volume_cb
+
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2
+ sneakernet.sync_master('.', lambda h, x: None, [
+ ('node', {'type': 'syn', 'probe': 1, 'ballast': '*' * sneakernet._RESERVED_SIZE}),
+ ('node', {'type': 'syn', 'probe': 2}),
+ ])
+
+ statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2
+ sneakernet.sync_node('node2', '.', lambda h, x: None, [
+ ('node', {'type': 'syn', 'probe': 3, 'ballast': '*' * sneakernet._RESERVED_SIZE}),
+ ('node', {'type': 'syn', 'probe': 4}),
+ ])
+
+ packets = []
+ for path in glob('*.packet.gz'):
+ with sneakernet._InPacket(path) as packet:
+ for row in packet.read_rows(type='syn'):
+ pass
+ for row in packet.syns:
+ packets.append((row['probe'], len(row.get('ballast', []))))
+ self.assertEqual(
+ sorted([
+ (1, sneakernet._RESERVED_SIZE),
+ (2, 0),
+ (3, sneakernet._RESERVED_SIZE),
+ (4, 0),
+ ]),
+ sorted(packets))
+
+ def packet(self, filename, data):
+ bundle = gzip.GzipFile(filename, 'w')
+ for i in data:
+ bundle.write(json.dumps(i) + '\n')
+ bundle.close()
+
+
+class statvfs(object):
+
+ f_bfree = sneakernet._RESERVED_SIZE * 10
+ f_frsize = 1
+
+
+if __name__ == '__main__':
+ tests.main()