From 960845fe3c1792a49ed6dde7db86b9516b9cc08c Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Tue, 12 Jun 2012 11:06:32 +0000 Subject: Move sneakernet code from ad --- diff --git a/sugar_network_server/sneakernet.py b/sugar_network_server/sneakernet.py new file mode 100644 index 0000000..2b482e7 --- /dev/null +++ b/sugar_network_server/sneakernet.py @@ -0,0 +1,274 @@ +# Copyright (C) 2012, Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import json +import gzip +import logging +from glob import glob +from os.path import abspath, isfile, join +from gettext import gettext as _ + +from active_document import env + + +#: Callback to execute while synchronizing server data when target directory +#: is full; accepts (`path`) arguments +next_volume_cb = None + +_HEADER_SIZE = 4096 +_RESERVED_SIZE = 1024 * 1024 + +_logger = logging.getLogger('active_document.sneakernet') + + +def sync_node(node, volume_path, merge_cb, diff): + _logger.info(_('Synchronize with %s directory'), volume_path) + + for packet in _import(volume_path, merge_cb): + sender = packet.header.get('sender') + if sender == node: + _logger.debug('Remove existing %s packet', packet.path) + os.unlink(packet.path) + elif sender == 'master' and packet.header.get('to') == node: + for row in packet.read_rows(type='ack'): + merge_cb(packet.header, row) + _logger.debug('Remove loaded %s packet', packet.path) + os.unlink(packet.path) + else: + for row in packet.read_rows(type=['diff', 'request']): + if row['type'] == 'diff': + merge_cb(packet.header, row) + if sender != 'master': + del packet.syns[:] + + return _export(volume_path, node, diff) + + +def sync_master(volume_path, merge_cb, diff): + _logger.info(_('Synchronize with %s directory'), volume_path) + + for packet in _import(volume_path, merge_cb): + if packet.header.get('sender') == 'master': + del packet.syns[:] + else: + for row in packet.read_rows(type=['diff', 'request']): + merge_cb(packet.header, row) + _logger.debug('Remove loaded %s packet', packet.path) + os.unlink(packet.path) + + return _export(volume_path, 'master', diff) + + +class _InPacket(object): + + def __init__(self, path): + self.path = abspath(path) + self._zip = None + self._row = None + self.header = {} + self.syns = [] + + if not isfile(path): + return + + self._zip = gzip.GzipFile(path) + header = self._read(size=_HEADER_SIZE, subject='Sugar Network Packet') + if header is None: + _logger.info(_('Skip not recognized input packet file, %s'), path) + self.close() + else: + _logger.info(_('Open input packet file, %s: %r'), path, header) + self.header = header + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + @property + def opened(self): + return self._zip is not None + + def read_rows(self, **kwargs): + while True: + row = self._read(**kwargs) + if row is None: + break + elif row is False: + continue + yield row + + def close(self): + if self._zip is not None: + self._zip.close() + self._zip = None + + def _read(self, size=None, **kwargs): + try: + if self._row is None: + if not self.opened: + return + row = self._zip.readline(size) + if not row: + _logger.warning(_('EOF for packet file, %s'), self.path) + self.close() + return + row = dict(json.loads(row)) + else: + row = self._row + + if row.get('type') == 'syn': + self.syns.append(row) + self._row = None + return False + + for key, value in kwargs.items(): + if type(value) == list: + if row.get(key) in value: + continue + elif row.get(key) == value: + continue + self._row = row + return None + + self._row = None + return row + + except (IOError, ValueError, TypeError), error: + _logger.warning(_('Malformed input packet file %r: %s'), + self.path, error) + self.close() + + +class _OutPacket(object): + + def __init__(self, root, **kwargs): + self._root = abspath(root) + self.header = {'subject': 'Sugar Network Packet'} + for key, value in kwargs.items(): + if value is not None: + self.header[key] = value + self._zip = None + self._couter = 0 + self.path = None + self._packets = [] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def close(self): + if self._zip is not None: + self._zip.close() + self._zip = None + self._couter = 0 + + def write_row(self, **kwargs): + if self._zip is None or self._couter >= _RESERVED_SIZE: + self._next_volume() + if kwargs.get('type') == 'syn': + kwargs['packets'] = self._packets + self._write_row(kwargs) + + def _next_volume(self): + next_guid = env.uuid() + if self._zip is not None: + self._write_row({'type': 'part', 'next': next_guid}) + self.header['prev'] = self.header['guid'] + self.close() + + while True: + stat = os.statvfs(self._root) + if stat.f_bfree * stat.f_frsize >= _RESERVED_SIZE * 2: + break + # pylint: disable-msg=E1102 + if next_volume_cb is None or not next_volume_cb(self._root): + raise IOError(_('No free disk space in %r') % self._root) + _logger.info(_('Switched volumes for %r'), self._root) + + self._packets.append(next_guid) + self.header['guid'] = next_guid + self.path = join(self._root, '%s.packet.gz' % next_guid) + + _logger.info(_('Open output packet file %r'), self.path) + self._zip = gzip.GzipFile(self.path, 'w') + self.write_row(**self.header) + + def _write_row(self, kwargs): + data = json.dumps(kwargs) + self._zip.write(data) + self._zip.write('\n') + self._couter += len(data) + 1 + + +def _import(volume_path, merge_cb): + processed = set() + processed_guids = set() + syns = [] + + while True: + parts = set() + + for packet_path in glob(join(volume_path, '*.packet.gz')): + if packet_path in processed: + continue + processed.add(packet_path) + with _InPacket(packet_path) as packet: + processed_guids.add(packet.header.get('guid')) + if 'prev' in packet.header: + parts.add(packet.header['prev']) + yield packet + for i in packet.syns: + syns.append((packet.header, i)) + for row in packet.read_rows(type='part'): + parts.add(row['next']) + + if parts and next_volume_cb is not None: + part_names = ', '.join(['%s.packet.gz' for i in parts]) + # pylint: disable-msg=E1102 + if next_volume_cb(volume_path, + _('Change %s volume to load %s packet(s)') % \ + (volume_path, part_names)): + continue + break + + for header, row in syns: + if not (set(row.get('packets', [])) - processed_guids): + merge_cb(header, row) + + +def _export(volume_path, sender, diff): + packet = None + last_to = False + result = None + + try: + for to, row in diff: + if to != last_to: + last_to = to + if packet is not None: + packet.close() + packet = _OutPacket(volume_path, sender=sender, to=to) + packet.write_row(**row) + finally: + if packet is not None: + packet.close() + result = packet.path + + return result diff --git a/tests/__init__.py b/tests/__init__.py index e5cebab..c668b77 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -10,6 +10,7 @@ from os.path import dirname, join, exists, abspath import active_document as ad import restful_document as rd from active_toolkit import coroutine +from sugar_network_server import sneakernet root = abspath(dirname(__file__)) tmproot = join(root, '.tmp') @@ -41,6 +42,8 @@ class Test(unittest.TestCase): ad.find_limit.value = 1024 ad.index_write_queue.value = 10 + sneakernet.next_volume_cb = None + self._logfile = file(self.logfile + '.out', 'a') sys.stdout = sys.stderr = self._logfile diff --git a/tests/units/__main__.py b/tests/units/__main__.py index 6b22e1e..346e743 100644 --- a/tests/units/__main__.py +++ b/tests/units/__main__.py @@ -3,6 +3,7 @@ from __init__ import tests from sequence import * +from sneakernet import * from user import * diff --git a/tests/units/sneakernet.py b/tests/units/sneakernet.py new file mode 100755 index 0000000..7101ab4 --- /dev/null +++ b/tests/units/sneakernet.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import gzip +import json +from glob import glob +from os.path import exists + +from __init__ import tests + +from sugar_network_server import sneakernet + + +class SneakernetTest(tests.Test): + + def setUp(self): + tests.Test.setUp(self) + self.override(os, 'statvfs', lambda *args: statvfs()) + + def test_InPacket_WrongFile(self): + self.packet('test.gz', []) + packet = sneakernet._InPacket('test.gz') + assert not packet.opened + + self.packet('test.gz', [{'subject': 'bar'}]) + packet = sneakernet._InPacket('test.gz') + assert not packet.opened + + self.packet('test.gz', [{'subject': 'Sugar Network Packet'}]) + packet = sneakernet._InPacket('test.gz') + assert packet.opened + self.assertEqual(None, packet.header.get('sender')) + self.assertEqual(None, packet.header.get('receiver')) + + self.packet('test.gz', [{'subject': 'Sugar Network Packet', 'sender': 'me', 'receiver': 'you'}]) + packet = sneakernet._InPacket('test.gz') + assert packet.opened + self.assertEqual('me', packet.header.get('sender')) + self.assertEqual('you', packet.header.get('receiver')) + + def test_InPacket(self): + self.packet('test.gz', [ + {'subject': 'Sugar Network Packet'}, + ]) + packet = sneakernet._InPacket('test.gz') + self.assertEqual( + [], + [i for i in packet.read_rows(type='probe')]) + assert not packet.opened + + self.packet('test.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'probe', 'foo': 'bar'}, + ]) + packet = sneakernet._InPacket('test.gz') + self.assertEqual( + [{'type': 'probe', 'foo': 'bar'}], + [i for i in packet.read_rows(type='probe')]) + assert not packet.opened + + self.packet('test.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'probe', 'foo': 1}, + {'type': 'probe', 'foo': 2}, + {'type': 'stop'}, + {'type': 'probe', 'foo': 3} + ]) + packet = sneakernet._InPacket('test.gz') + self.assertEqual( + [{'type': 'probe', 'foo': 1}, {'type': 'probe', 'foo': 2}], + [i for i in packet.read_rows(type='probe')]) + assert packet.opened + self.assertEqual( + [{'type': 'stop'}], + [i for i in packet.read_rows(type='stop')]) + assert packet.opened + self.assertEqual( + [{'type': 'probe', 'foo': 3}], + [i for i in packet.read_rows(type='probe')]) + assert not packet.opened + + def test_OutPacket(self): + out_packet = sneakernet._OutPacket('.', sender='me') + out_packet.close() + assert out_packet.path is None + + out_packet = sneakernet._OutPacket('.', sender='me') + out_packet.write_row(foo='bar') + out_packet.write_row(bar='foo') + out_packet.close() + assert exists(out_packet.path) + in_packet = sneakernet._InPacket(out_packet.path) + self.assertEqual('me', in_packet.header.get('sender')) + self.assertEqual( + [{'foo': 'bar'}, {'bar': 'foo'}], + [i for i in in_packet.read_rows()]) + + def test_OutPacket_DiskFull(self): + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 - 1 + out_packet = sneakernet._OutPacket('.', sender='me') + self.assertRaises(IOError, out_packet.write_row, foo='bar') + out_packet.close() + assert out_packet.path is None + + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 + out_packet = sneakernet._OutPacket('.', sender='me') + out_packet.write_row(foo='bar') + out_packet.close() + in_packet = sneakernet._InPacket(out_packet.path) + self.assertEqual('me', in_packet.header.get('sender')) + self.assertEqual( + [{'foo': 'bar'}], + [i for i in in_packet.read_rows()]) + + def test_OutPacket_SwitchVolumes(self): + switches = [] + + def next_volume_cb(path, *args): + switches.append(path) + if len(switches) == 3: + statvfs.f_bfree += 1 + return True + + sneakernet.next_volume_cb = next_volume_cb + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 - 1 + out_packet = sneakernet._OutPacket('.', sender='me') + out_packet.write_row(foo='bar') + out_packet.close() + + self.assertEqual( + [tests.tmpdir] * 3, + switches) + self.assertEqual( + [{'foo': 'bar'}], + [i for i in sneakernet._InPacket(out_packet.path).read_rows()]) + + def test_OutPacket_WriteToSeveralVolumes(self): + switches = [] + out_packet = sneakernet._OutPacket('.', sender='me') + + def next_volume_cb(path, *args): + switches.append(path) + os.rename(out_packet.path, '%s.gz' % len(switches)) + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 + if len(switches) == 3: + raise '!!' + return True + + sneakernet.next_volume_cb = next_volume_cb + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 + out_packet.write_row(write=1, data='*' * sneakernet._RESERVED_SIZE) + statvfs.f_bfree = sneakernet._RESERVED_SIZE + out_packet.write_row(write=2, data='*' * sneakernet._RESERVED_SIZE) + statvfs.f_bfree = sneakernet._RESERVED_SIZE + out_packet.write_row(write=3, data='*' * sneakernet._RESERVED_SIZE) + out_packet.close() + + self.assertEqual( + [tests.tmpdir] * 2, + switches) + + in_packet_1 = sneakernet._InPacket('1.gz') + in_packet_2 = sneakernet._InPacket('2.gz') + in_packet_3 = sneakernet._InPacket(out_packet.path) + + assert in_packet_1.header['guid'] + assert in_packet_1.header['guid'] != in_packet_2.header['guid'] + assert in_packet_1.header['guid'] != in_packet_3.header['guid'] + self.assertEqual('me', in_packet_1.header.get('sender')) + self.assertEqual( + [(1, None), (None, in_packet_2.header['guid'])], + [(i.get('write'), i.get('next')) for i in in_packet_1.read_rows()]) + assert 'prev' not in in_packet_1.header + + assert in_packet_2.header['guid'] + assert in_packet_2.header['guid'] != in_packet_1.header['guid'] + assert in_packet_2.header['guid'] != in_packet_3.header['guid'] + self.assertEqual('me', in_packet_2.header.get('sender')) + self.assertEqual( + [(2, None), (None, in_packet_3.header['guid'])], + [(i.get('write'), i.get('next')) for i in in_packet_2.read_rows()]) + self.assertEqual(in_packet_1.header['guid'], in_packet_2.header.get('prev')) + + assert in_packet_3.header['guid'] + assert in_packet_3.header['guid'] != in_packet_1.header['guid'] + assert in_packet_3.header['guid'] != in_packet_2.header['guid'] + self.assertEqual('me', in_packet_3.header.get('sender')) + self.assertEqual( + [(3, None)], + [(i.get('write'), i.get('next')) for i in in_packet_3.read_rows()]) + self.assertEqual(in_packet_2.header['guid'], in_packet_3.header.get('prev')) + + def test_node_import_SupportedTypes(self): + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'foo'}, + {'type': 'diff'}, + {'type': 'syn'}, + {'type': 'ack'}, + {'type': 'request'}, + {'type': 'foo'}, + ]) + rows = [] + sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), []) + self.assertEqual( + sorted([{'type': 'diff'}]), + sorted(rows)) + assert exists('test.packet.gz') + + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'master'}, + {'type': 'diff'}, + {'type': 'syn'}, + {'type': 'ack'}, + {'type': 'request'}, + {'type': 'foo'}, + ]) + rows = [] + sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), []) + self.assertEqual( + sorted([{'type': 'diff'}, {'type': 'syn'}]), + sorted(rows)) + assert exists('test.packet.gz') + + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'node'}, + {'type': 'diff'}, + {'type': 'syn'}, + {'type': 'ack'}, + {'type': 'request'}, + {'type': 'foo'}, + ]) + rows = [] + sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), []) + self.assertEqual( + sorted([]), + sorted(rows)) + assert not exists('test.packet.gz') + + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'master', 'to': 'node'}, + {'type': 'ack'}, + {'type': 'diff'}, + {'type': 'syn'}, + {'type': 'request'}, + {'type': 'foo'}, + ]) + rows = [] + sneakernet.sync_node('node', '.', lambda h, x: rows.append(x), []) + self.assertEqual( + sorted([{'type': 'ack'}]), + sorted(rows)) + assert not exists('test.packet.gz') + + def test_master_import_SupportedTypes(self): + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff'}, + {'type': 'syn'}, + {'type': 'request'}, + {'type': 'foo'}, + ]) + + rows = [] + sneakernet.sync_master('.', lambda h, x: rows.append(x), []) + + self.assertEqual( + sorted([{'type': 'diff'}, {'type': 'syn'}, {'type': 'request'}]), + sorted(rows)) + + def test_master_import_SkipMaster(self): + self.packet('test.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'master'}, + {'type': 'diff'}, + ]) + + rows = [] + sneakernet.sync_master('.', lambda h, x: rows.append(x), []) + + self.assertEqual([], rows) + + def test_master_export_CleanupImported(self): + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'master'}, + {'type': 'diff'}, + ]) + self.packet('2.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'master'}, + {'type': 'diff'}, + ]) + + packet = sneakernet.sync_master('.', lambda h, x: None, [('node', {})]) + + assert not exists('1.packet.gz') + assert not exists('2.packet.gz') + assert exists(packet) + + def test_node_export_CleanupExisting(self): + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'node-1'}, + {'type': 'diff'}, + ]) + self.packet('2.packet.gz', [ + {'subject': 'Sugar Network Packet', 'sender': 'node-2'}, + {'type': 'diff'}, + ]) + + packet_1 = sneakernet.sync_node('node-1', '.', lambda h, row: None, [('master', {})]) + + assert not exists('1.packet.gz') + assert exists('2.packet.gz') + assert exists(packet_1) + + def test_sync_node_Volumes(self): + volumes = [] + def next_volume_cb(*args): + if not volumes: + volumes.append(True) + return True + sneakernet.next_volume_cb = next_volume_cb + + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff', 'seqno': 1}, + {'type': 'part', 'next': '2'}, + ]) + imported = [] + sneakernet.sync_node('node-1', '.', lambda h, x: imported.append(x['seqno']), []) + self.assertEqual([1], imported) + + def next_volume_cb(*args): + if not exists('2.packet.gz'): + self.packet('2.packet.gz', [ + {'subject': 'Sugar Network Packet', 'prev': '3'}, + {'type': 'diff', 'seqno': 2}, + ]) + return True + if not exists('3.packet.gz'): + self.packet('3.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff', 'seqno': 3}, + {'type': 'part', 'next': '1'}, + ]) + return True + sneakernet.next_volume_cb = next_volume_cb + + imported = [] + sneakernet.sync_node('node-1', '.', lambda h, x: imported.append(x['seqno']), []) + self.assertEqual([1, 2, 3], imported) + + def test_sync_master_Volumes(self): + volumes = [] + def next_volume_cb(*args): + if not volumes: + volumes.append(True) + return True + sneakernet.next_volume_cb = next_volume_cb + + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff', 'seqno': 1}, + {'type': 'part', 'next': '2'}, + ]) + imported = [] + sneakernet.sync_master('.', lambda h, x: imported.append(x['seqno']), []) + self.assertEqual([1], imported) + + volumes = [] + def next_volume_cb(*args): + if len(volumes) == 0: + self.packet('2.packet.gz', [ + {'subject': 'Sugar Network Packet', 'prev': '3'}, + {'type': 'diff', 'seqno': 2}, + ]) + volumes.append(True) + return True + if len(volumes) == 1: + self.packet('3.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff', 'seqno': 3}, + {'type': 'part', 'next': '1'}, + ]) + volumes.append(True) + return True + sneakernet.next_volume_cb = next_volume_cb + + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet'}, + {'type': 'diff', 'seqno': 1}, + {'type': 'part', 'next': '2'}, + ]) + imported = [] + sneakernet.sync_master('.', lambda h, x: imported.append(x['seqno']), []) + self.assertEqual([1, 2, 3], imported) + + def test_SYNOnlyForImportedPackets(self): + self.packet('1.packet.gz', [ + {'subject': 'Sugar Network Packet', 'guid': '1', 'sender': 'master'}, + {'type': 'syn', 'packets': ['1'], 'mark': '1'}, + {'type': 'syn', 'packets': ['1', 'foo'], 'mark': '2'}, + {'type': 'part', 'next': '2'}, + ]) + self.packet('2.packet.gz', [ + {'subject': 'Sugar Network Packet', 'guid': '2', 'prev': '1'}, + {'type': 'syn', 'packets': ['1', '2'], 'mark': '3'}, + {'type': 'syn', 'packets': ['foo', '2'], 'mark': '4'}, + {'type': 'syn', 'packets': ['1'], 'mark': '5'}, + ]) + + node_syns = [] + sneakernet.sync_node('node', '.', lambda h, x: node_syns.append(x['mark']), []) + master_syns = [] + sneakernet.sync_master('.', lambda h, x: master_syns.append(x['mark']), []) + + self.assertEqual(sorted(['1']), sorted(node_syns)) + self.assertEqual(sorted(['3', '5']), sorted(master_syns)) + + def test_SYNOnlyForExportedPackets(self): + + def next_volume_cb(path, *args): + statvfs.f_bfree += 1 + return True + sneakernet.next_volume_cb = next_volume_cb + + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 + sneakernet.sync_master('.', lambda h, x: None, [ + ('node', {'type': 'syn', 'probe': 1, 'ballast': '*' * sneakernet._RESERVED_SIZE}), + ('node', {'type': 'syn', 'probe': 2}), + ]) + + statvfs.f_bfree = sneakernet._RESERVED_SIZE * 2 + sneakernet.sync_node('node2', '.', lambda h, x: None, [ + ('node', {'type': 'syn', 'probe': 3, 'ballast': '*' * sneakernet._RESERVED_SIZE}), + ('node', {'type': 'syn', 'probe': 4}), + ]) + + packets = [] + for path in glob('*.packet.gz'): + with sneakernet._InPacket(path) as packet: + for row in packet.read_rows(type='syn'): + pass + for row in packet.syns: + packets.append((row['probe'], len(row.get('ballast', [])))) + self.assertEqual( + sorted([ + (1, sneakernet._RESERVED_SIZE), + (2, 0), + (3, sneakernet._RESERVED_SIZE), + (4, 0), + ]), + sorted(packets)) + + def packet(self, filename, data): + bundle = gzip.GzipFile(filename, 'w') + for i in data: + bundle.write(json.dumps(i) + '\n') + bundle.close() + + +class statvfs(object): + + f_bfree = sneakernet._RESERVED_SIZE * 10 + f_frsize = 1 + + +if __name__ == '__main__': + tests.main() -- cgit v0.9.1