diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-08 12:34:17 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-08 12:34:17 (GMT) |
commit | c8ce1a074715d809dd3ec435978113e5b0c30ed7 (patch) | |
tree | c391a1064d9103fa94872319a5332fc7a9dc09b3 | |
parent | e297a16dce2c17a6ca6b1af5257008365b201b0f (diff) |
Keep node sync code out of local/
-rwxr-xr-x | sugar-network-server | 2 | ||||
-rwxr-xr-x | sugar-network-service | 9 | ||||
-rw-r--r-- | sugar_network/local/mounts.py | 136 | ||||
-rw-r--r-- | sugar_network/local/mountset.py | 140 | ||||
-rw-r--r-- | sugar_network/node/commands.py | 8 | ||||
-rw-r--r-- | sugar_network/node/sync_master.py | 8 | ||||
-rw-r--r-- | sugar_network/node/sync_node.py | 212 | ||||
-rw-r--r-- | sugar_network/toolkit/files_sync.py | 4 | ||||
-rw-r--r-- | sugar_network/toolkit/mounts_monitor.py | 76 | ||||
-rw-r--r-- | tests/__init__.py | 5 | ||||
-rwxr-xr-x | tests/units/dbus_client.py | 2 | ||||
-rwxr-xr-x | tests/units/mounts_monitor.py | 57 | ||||
-rwxr-xr-x | tests/units/mountset.py | 3 | ||||
-rwxr-xr-x | tests/units/node_mount.py | 2 | ||||
-rwxr-xr-x | tests/units/sync_master.py | 12 | ||||
-rwxr-xr-x | tests/units/sync_node.py | 123 |
16 files changed, 424 insertions, 375 deletions
diff --git a/sugar-network-server b/sugar-network-server index 928fe92..1878faf 100755 --- a/sugar-network-server +++ b/sugar-network-server @@ -55,7 +55,7 @@ class Application(application.Daemon): self.jobs.spawn(volume.populate) subscriber = SubscribeSocket(volume, node.host.value, node.subscribe_port.value) - cp = MasterCommands(volume, subscriber, node.sync_dirs.value) + cp = MasterCommands(volume, subscriber) logging.info('Listening for requests on %s:%s', node.host.value, node.port.value) diff --git a/sugar-network-service b/sugar-network-service index 4e2b18c..25bcb55 100755 --- a/sugar-network-service +++ b/sugar-network-service @@ -26,7 +26,7 @@ import sugar_network_webui as webui from sugar_network import local, node, Client from sugar_network.toolkit import sugar, sneakernet -from sugar_network.toolkit import crypto, dbus_thread +from sugar_network.toolkit import crypto, dbus_thread, mounts_monitor from sugar_network.local import activities, datastore from sugar_network.local.dbus_datastore import Datastore from sugar_network.local.dbus_network import Network @@ -71,7 +71,6 @@ class Application(application.Application): new_root = (local.local_root.value != local.local_root.default) local.local_root.value = abspath(local.local_root.value) - local.mounts_root.value = abspath(local.mounts_root.value) if new_root: application.logdir.value = join(local.local_root.value, 'log') @@ -151,7 +150,7 @@ class Application(application.Application): jobs = coroutine.Pool() volume = Volume(self._db_path, lazy_open=local.lazy_open.value) - mountset = Mountset(volume, node.sync_dirs.value) + mountset = Mountset(volume) mountset['~'] = HomeMount(volume) mountset['/'] = RemoteMount(volume) @@ -182,6 +181,9 @@ class Application(application.Application): server = coroutine.WSGIServer(host, webui.get_app()) jobs.spawn(server.serve_forever) + if local.mounts_root.value: + mounts_monitor.start(abspath(local.mounts_root.value)) + if local.delayed_start.value: mountset.connect(delayed_start, event='delayed-start') else: @@ -200,6 +202,7 @@ class Application(application.Application): except Exception: util.exception('%s aborted', self.name) finally: + mounts_monitor.stop() jobs.kill() mountset.close() diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py index 57ae353..6d75740 100644 --- a/sugar_network/local/mounts.py +++ b/sugar_network/local/mounts.py @@ -14,20 +14,17 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -import sys import json import shutil import logging from urlparse import urlparse -from os.path import isabs, exists, join, basename, dirname +from os.path import isabs, exists, join, basename from gettext import gettext as _ import sweets_recipe import active_document as ad from sweets_recipe import Bundle -from sugar_network.toolkit.collection import Sequence, PersistentSequence -from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull -from sugar_network.toolkit import sugar, http, sneakernet +from sugar_network.toolkit import sugar, http from sugar_network.local import activities, cache from sugar_network import local, checkin, sugar from active_toolkit import sockets, util, coroutine, enforce @@ -416,23 +413,23 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): class NodeMount(LocalMount, _ProxyCommands): - def __init__(self, volume, home_volume, file_syncs=None): + def __init__(self, volume, home_volume): LocalMount.__init__(self, volume) _ProxyCommands.__init__(self, home_volume) - self._file_syncs = file_syncs or {} - self._push_seq = PersistentSequence( - join(volume.root, 'push.sequence'), [1, None]) - self._pull_seq = PersistentSequence( - join(volume.root, 'pull.sequence'), [1, None]) - self._sync_session = None - self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync') - with file(join(volume.root, 'node')) as f: self._node_guid = f.read().strip() with file(join(volume.root, 'master')) as f: self._master_guid = f.read().strip() + @property + def node_guid(self): + return self._node_guid + + @property + def master_guid(self): + return self._master_guid + def call(self, request, response): return self._proxy_call(request, response, super(NodeMount, self).call) @@ -453,114 +450,3 @@ class NodeMount(LocalMount, _ProxyCommands): self._node_guid, extract) return meta - - def sync(self, path, accept_length=None, push_sequence=None, session=None): - to_push_seq = Sequence(empty_value=[1, None]) - if push_sequence is None: - to_push_seq.include(self._push_seq) - else: - to_push_seq = Sequence(push_sequence) - - if session is None: - session_is_new = True - session = ad.uuid() - else: - session_is_new = False - - while True: - for packet in sneakernet.walk(path): - if packet.header.get('src') == self._node_guid: - if packet.header.get('session') == session: - _logger.debug('Keep current session %r packet', packet) - else: - _logger.debug('Remove our previous %r packet', packet) - os.unlink(packet.path) - else: - self._import(packet, to_push_seq) - self._push_seq.commit() - self._pull_seq.commit() - - if exists(self._sync_script): - shutil.copy(self._sync_script, path) - - with OutFilePacket(path, limit=accept_length, src=self._node_guid, - dst=self._master_guid, session=session, - seqno=self.volume.seqno.value, - api_url=local.api_url.value) as packet: - if session_is_new: - for directory, sync in self._file_syncs.items(): - packet.push(cmd='files_pull', directory=directory, - sequence=sync.sequence) - packet.push(cmd='sn_pull', sequence=self._pull_seq) - - _logger.debug('Generating %r PUSH packet to %r', - packet, packet.path) - self.publish({ - 'event': 'sync_progress', - 'progress': _('Generating %r packet') % packet.basename, - }) - - try: - self.volume.diff(to_push_seq, packet) - except DiskFull: - return {'push_sequence': to_push_seq, 'session': session} - else: - break - - def sync_session(self, mounts, path=None): - _logger.debug('Start synchronization session with %r session ' - 'for %r mounts', self._sync_session, mounts) - - def sync(path): - self.publish({'event': 'sync_start', 'path': path}) - self._sync_session = self.sync(path, **(self._sync_session or {})) - return self._sync_session is None - - try: - while True: - if path and sync(path): - break - for mountpoint in mounts: - if sync(mountpoint): - break - break - except Exception, error: - util.exception(_logger, 'Failed to complete synchronization') - self.publish({'event': 'sync_error', 'error': str(error)}) - self._sync_session = None - - if self._sync_session is None: - _logger.debug('Synchronization completed') - self.publish({'event': 'sync_complete'}) - else: - _logger.debug('Postpone synchronization with %r session', - self._sync_session) - self.publish({'event': 'sync_continue'}) - - def _import(self, packet, to_push_seq): - self.publish({ - 'event': 'sync_progress', - 'progress': _('Reading %r packet') % basename(packet.path), - }) - _logger.debug('Processing %r PUSH packet from %r', packet, packet.path) - - from_master = (packet.header.get('src') == self._master_guid) - - for record in packet.records(): - cmd = record.get('cmd') - if cmd == 'sn_push': - self.volume.merge(record, increment_seqno=False) - elif from_master: - if cmd == 'sn_commit': - _logger.debug('Processing %r COMMIT from %r', - record, packet) - self._pull_seq.exclude(record['sequence']) - elif cmd == 'sn_ack' and record['dst'] == self._node_guid: - _logger.debug('Processing %r ACK from %r', record, packet) - self._push_seq.exclude(record['sequence']) - self._pull_seq.exclude(record['merged']) - to_push_seq.exclude(record['sequence']) - self.volume.seqno.next() - self.volume.seqno.commit() - elif record.get('directory') in self._file_syncs: - self._file_syncs[record['directory']].push(record) diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py index ae622a3..e9e081e 100644 --- a/sugar_network/local/mountset.py +++ b/sugar_network/local/mountset.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import locale import socket import logging @@ -21,48 +20,35 @@ from os.path import join, exists import active_document as ad -from sugar_network.toolkit.inotify import Inotify, \ - IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM from sugar_network import local, node -from sugar_network.toolkit import zeroconf, netlink, network -from sugar_network.toolkit.collection import MutableStack -from sugar_network.toolkit.files_sync import Leechers +from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor from sugar_network.local.mounts import LocalMount, NodeMount from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.node.commands import NodeCommands +from sugar_network.node.sync_node import SyncCommands from sugar_network.node.router import Router from sugar_network.resources.volume import Volume from active_toolkit import util, coroutine, enforce _DB_DIRNAME = '.sugar-network' -_SYNC_DIRNAME = '.sugar-network-sync' - -_COMPLETE_MOUNT_TIMEOUT = 3 _logger = logging.getLogger('local.mountset') -class Mountset(dict, ad.CommandsProcessor): +class Mountset(dict, ad.CommandsProcessor, SyncCommands): - def __init__(self, home_volume, sync_dirs=None): + def __init__(self, home_volume): dict.__init__(self) ad.CommandsProcessor.__init__(self) + SyncCommands.__init__(self, local.path('sync')) self.opened = coroutine.Event() - self.home_volume = home_volume - if sync_dirs is None: - self._file_syncs = {} - else: - self._file_syncs = Leechers(sync_dirs, - join(home_volume.root, 'files')) self._subscriptions = {} self._locale = locale.getdefaultlocale()[0].replace('_', '-') self._jobs = coroutine.Pool() self._servers = coroutine.Pool() - self._sync_dirs = MutableStack() - self._sync = coroutine.Pool() def __getitem__(self, mountpoint): enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint) @@ -100,26 +86,6 @@ class Mountset(dict, ad.CommandsProcessor): mount.set_mounted(True) return mount.mounted - @ad.volume_command(method='POST', cmd='start_sync') - def start_sync(self, rewind=False, path=None): - if self._sync: - return - - enforce(path or self._sync_dirs, 'No mounts to synchronize with') - - for mount in self.values(): - if isinstance(mount, NodeMount): - if rewind: - self._sync_dirs.rewind() - self._sync.spawn(mount.sync_session, self._sync_dirs, path) - break - else: - raise RuntimeError('No mounted servers') - - @ad.volume_command(method='POST', cmd='break_sync') - def break_sync(self): - self._sync.kill() - @ad.volume_command(method='POST', cmd='publish') def republish(self, request): self.publish(request.content) @@ -128,28 +94,24 @@ class Mountset(dict, ad.CommandsProcessor): if response is None: response = ad.Response() request.accept_language = [self._locale] - mountpoint = None + mountpoint = request.get('mountpoint') - def process_call(): + try: try: - return ad.CommandsProcessor.call(self, request, response) + result = ad.CommandsProcessor.call(self, request, response) except ad.CommandNotFound: - mountpoint = request.pop('mountpoint') + request.pop('mountpoint') mount = self[mountpoint] if mountpoint == '/': mount.set_mounted(True) enforce(mount.mounted, '%r is not mounted', mountpoint) - return mount.call(request, response) - - try: - result = process_call() + result = mount.call(request, response) except Exception: - util.exception(_logger, - 'Failed to call %s on %r', request, mountpoint) + util.exception(_logger, 'Failed to call %s on %r', + request, mountpoint) raise - else: - _logger.debug('Called %s on %r: %r', request, mountpoint, result) + _logger.debug('Called %s on %r: %r', request, mountpoint, result) return result def connect(self, callback, condition=None, **kwargs): @@ -172,16 +134,8 @@ class Mountset(dict, ad.CommandsProcessor): def open(self): try: - mounts_root = local.mounts_root.value - if mounts_root: - for filename in os.listdir(mounts_root): - self._found_mount(join(mounts_root, filename)) - # In case if sync mounts processed before server mounts - # TODO More obvious code - for filename in os.listdir(mounts_root): - self._found_mount(join(mounts_root, filename)) - self._jobs.spawn(self._mounts_monitor) - + mounts_monitor.connect(_DB_DIRNAME, + self._found_mount, self._lost_mount) if '/' in self: if local.api_url.value: crawler = self._wait_for_master @@ -217,61 +171,23 @@ class Mountset(dict, ad.CommandsProcessor): # Otherwise, `socket.gethostbyname()` will return stale resolve network.res_init() - def _mounts_monitor(self): - root = local.mounts_root.value - _logger.info('Start monitoring %r for mounts', root) - - with Inotify() as monitor: - monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE | - IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM) - while not monitor.closed: - coroutine.select([monitor.fileno()], [], []) - for filename, event, __ in monitor.read(): - path = join(root, filename) - try: - if event & IN_DELETE_SELF: - _logger.warning('Lost %r, cannot monitor anymore', - root) - monitor.close() - break - elif event & (IN_DELETE | IN_MOVED_FROM): - self._lost_mount(path) - elif event & (IN_CREATE | IN_MOVED_TO): - # Right after moutning, access to directory - # might be restricted; let system enough time - # to complete mounting - coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT) - self._found_mount(path) - except Exception: - util.exception(_logger, 'Mount %r failed', path) - def _found_mount(self, path): - if exists(join(path, _DB_DIRNAME)) and path not in self: - _logger.debug('Found %r server mount', path) - volume, server_mode = self._mount_volume(path) - if server_mode: - self[path] = NodeMount(volume, self.home_volume, - self._file_syncs) - else: - self[path] = LocalMount(volume) - - if exists(join(path, _SYNC_DIRNAME)): - self._sync_dirs.add(path) - if self._servers: - _logger.debug('Found %r sync mount', path) - self.start_sync() - else: - _logger.debug('Found %r sync mount but no servers', path) + volume, server_mode = self._mount_volume(path) + if server_mode: + _logger.debug('Mount %r in node mode', path) + self[path] = self.node_mount = NodeMount(volume, self.home_volume) + else: + _logger.debug('Mount %r in node-less mode', path) + self[path] = LocalMount(volume) def _lost_mount(self, path): + mount = self.get(path) + if mount is None: + return _logger.debug('Lost %r mount', path) - - self._sync_dirs.remove(join(path, _SYNC_DIRNAME)) - if not self._sync_dirs: - self.break_sync() - - if path in self: - del self[path] + if isinstance(mount, NodeMount): + self.node_mount = None + del self[path] def _mount_volume(self, path): lazy_open = local.lazy_open.value diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index 29d721b..617ab63 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -20,7 +20,7 @@ from os.path import exists, join import active_document as ad from sugar_network import node -from sugar_network.node import sync_master +from sugar_network.node.sync_master import SyncCommands from active_toolkit import util, enforce @@ -137,11 +137,11 @@ class NodeCommands(ad.VolumeCommands): props['author'] = authors -class MasterCommands(NodeCommands, sync_master.Commands): +class MasterCommands(NodeCommands, SyncCommands): - def __init__(self, volume, subscriber=None, sync_dirs=None): + def __init__(self, volume, subscriber=None): NodeCommands.__init__(self, volume, subscriber) - sync_master.Commands.__init__(self, sync_dirs) + SyncCommands.__init__(self) def _load_pubkey(pubkey): diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py index 2d4fcb1..c1016a0 100644 --- a/sugar_network/node/sync_master.py +++ b/sugar_network/node/sync_master.py @@ -37,14 +37,14 @@ _PULL_QUEUE_SIZE = 256 _logger = logging.getLogger('node.sync_master') -class Commands(object): +class SyncCommands(object): _guid = None volume = None - def __init__(self, sync_dirs=None): - self._file_syncs = Seeders(sync_dirs, - join(node.data_root.value, 'files'), self.volume.seqno) + def __init__(self): + self._file_syncs = Seeders(node.sync_dirs.value, + join(node.data_root.value, 'sync'), self.volume.seqno) self._pull_queue = lrucache(_PULL_QUEUE_SIZE, lambda key, pull: pull.unlink()) diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py new file mode 100644 index 0000000..c887ccd --- /dev/null +++ b/sugar_network/node/sync_node.py @@ -0,0 +1,212 @@ +# Copyright (C) 2012 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import sys +import shutil +import logging +from os.path import join, dirname, exists, basename +from gettext import gettext as _ + +import active_document as ad +from sugar_network import node, local +from sugar_network.toolkit import mounts_monitor, sneakernet +from sugar_network.toolkit.collection import MutableStack +from sugar_network.toolkit.files_sync import Leechers +from sugar_network.toolkit.collection import Sequence, PersistentSequence +from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull +from active_toolkit import coroutine, util, enforce + + +_SYNC_DIRNAME = '.sugar-network-sync' + +_logger = logging.getLogger('node.sync_node') + + +class SyncCommands(object): + + def __init__(self, sequences_path): + self._sync = coroutine.Pool() + self._sync_mounts = MutableStack() + self._file_syncs = Leechers(node.sync_dirs.value, sequences_path) + self._sync_session = None + self._push_seq = PersistentSequence( + join(sequences_path, 'push'), [1, None]) + self._pull_seq = PersistentSequence( + join(sequences_path, 'pull'), [1, None]) + self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync') + self._mount = None + + mounts_monitor.connect(_SYNC_DIRNAME, + self.__found_mountcb, self.__lost_mount_cb) + + @property + def node_mount(self): + return self._mount + + @node_mount.setter + def node_mount(self, value): + if self._mount is value: + return + self._mount = value + if self._sync_mounts: + self.start_sync() + + @ad.volume_command(method='POST', cmd='start_sync') + def start_sync(self, rewind=False, path=None): + enforce(self._mount is not None, 'No server to sync') + + if self._sync: + return + + enforce(self._mount is not None, 'No server to synchronize') + enforce(path or self._sync_mounts, 'No mounts to synchronize with') + + if rewind: + self._sync_mounts.rewind() + self._sync.spawn(self.sync_session, path) + + @ad.volume_command(method='POST', cmd='break_sync') + def break_sync(self): + self._sync.kill() + + def sync(self, path, accept_length=None, push_sequence=None, session=None): + enforce(self._mount is not None, 'No server to sync') + + to_push_seq = Sequence(empty_value=[1, None]) + if push_sequence is None: + to_push_seq.include(self._push_seq) + else: + to_push_seq = Sequence(push_sequence) + + if session is None: + session_is_new = True + session = ad.uuid() + else: + session_is_new = False + + while True: + for packet in sneakernet.walk(path): + if packet.header.get('src') == self._mount.node_guid: + if packet.header.get('session') == session: + _logger.debug('Keep current session %r packet', packet) + else: + _logger.debug('Remove our previous %r packet', packet) + os.unlink(packet.path) + else: + self._import(packet, to_push_seq) + self._push_seq.commit() + self._pull_seq.commit() + + if exists(self._sync_script): + shutil.copy(self._sync_script, path) + + with OutFilePacket(path, limit=accept_length, + src=self._mount.node_guid, dst=self._mount.master_guid, + session=session, seqno=self._mount.volume.seqno.value, + api_url=local.api_url.value) as packet: + if session_is_new: + for directory, sync in self._file_syncs.items(): + packet.push(cmd='files_pull', directory=directory, + sequence=sync.sequence) + packet.push(cmd='sn_pull', sequence=self._pull_seq) + + _logger.debug('Generating %r PUSH packet to %r', + packet, packet.path) + self._mount.publish({ + 'event': 'sync_progress', + 'progress': _('Generating %r packet') % packet.basename, + }) + + try: + self._mount.volume.diff(to_push_seq, packet) + except DiskFull: + return {'push_sequence': to_push_seq, 'session': session} + else: + break + + def sync_session(self, path=None): + enforce(self._mount is not None, 'No server to sync') + + _logger.debug('Start synchronization session with %r session ' + 'for %r mounts', self._sync_session, self._sync_mounts) + + def sync(path): + self._mount.publish({'event': 'sync_start', 'path': path}) + self._sync_session = self.sync(path, **(self._sync_session or {})) + return self._sync_session is None + + try: + while True: + if path and sync(path): + break + for mountpoint in self._sync_mounts: + if sync(mountpoint): + break + break + except Exception, error: + util.exception(_logger, 'Failed to complete synchronization') + self._mount.publish({'event': 'sync_error', 'error': str(error)}) + self._sync_session = None + + if self._sync_session is None: + _logger.debug('Synchronization completed') + self._mount.publish({'event': 'sync_complete'}) + else: + _logger.debug('Postpone synchronization with %r session', + self._sync_session) + self._mount.publish({'event': 'sync_continue'}) + + def _import(self, packet, to_push_seq): + self._mount.publish({ + 'event': 'sync_progress', + 'progress': _('Reading %r packet') % basename(packet.path), + }) + _logger.debug('Processing %r PUSH packet from %r', packet, packet.path) + + from_master = (packet.header.get('src') == self._mount.master_guid) + + for record in packet.records(): + cmd = record.get('cmd') + if cmd == 'sn_push': + self._mount.volume.merge(record, increment_seqno=False) + elif from_master: + if cmd == 'sn_commit': + _logger.debug('Processing %r COMMIT from %r', + record, packet) + self._pull_seq.exclude(record['sequence']) + elif cmd == 'sn_ack' and \ + record['dst'] == self._mount.node_guid: + _logger.debug('Processing %r ACK from %r', record, packet) + self._push_seq.exclude(record['sequence']) + self._pull_seq.exclude(record['merged']) + to_push_seq.exclude(record['sequence']) + self._mount.volume.seqno.next() + self._mount.volume.seqno.commit() + elif record.get('directory') in self._file_syncs: + self._file_syncs[record['directory']].push(record) + + def __found_mountcb(self, path): + self._sync_mounts.add(path) + if self._mount is not None: + _logger.debug('Found %r sync mount', path) + self.start_sync() + else: + _logger.debug('Found %r sync mount but no servers', path) + + def __lost_mount_cb(self, path): + self._sync_mounts.remove(join(path, _SYNC_DIRNAME)) + if not self._sync_mounts: + self.break_sync() diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py index 271a6ff..088eb08 100644 --- a/sugar_network/toolkit/files_sync.py +++ b/sugar_network/toolkit/files_sync.py @@ -179,7 +179,7 @@ class Seeders(dict): for path in sync_dirs or []: name = basename(path) - self[name] = Seeder(path, join(index_root, name), seqno) + self[name] = Seeder(path, join(index_root, name + '.files'), seqno) class Leecher(object): @@ -223,4 +223,4 @@ class Leechers(dict): for path in sync_dirs or []: name = basename(path) - self[name] = Leecher(path, join(sequences_root, name)) + self[name] = Leecher(path, join(sequences_root, name + '.files')) diff --git a/sugar_network/toolkit/mounts_monitor.py b/sugar_network/toolkit/mounts_monitor.py index 6c00203..f8507e0 100644 --- a/sugar_network/toolkit/mounts_monitor.py +++ b/sugar_network/toolkit/mounts_monitor.py @@ -15,6 +15,7 @@ import os import logging +from os.path import join, exists from sugar_network.toolkit.inotify import Inotify, \ IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM @@ -26,7 +27,7 @@ _COMPLETE_MOUNT_TIMEOUT = 3 _root = None _jobs = coroutine.Pool() _connects = {} -_found = set() +_found = {} _logger = logging.getLogger('mounts_monitor') @@ -37,63 +38,74 @@ def start(root): return _root = root _logger.info('Start monitoring %r for mounts', _root) - for filename in os.listdir(_root): - _found_mount(filename) + for name in os.listdir(_root): + _found_mount(join(_root, name)) _jobs.spawn(_monitor) def stop(): - if not _jobs: - return - _logger.info('Stop monitoring %r for mounts', _root) - _jobs.kill() + if _jobs: + _logger.info('Stop monitoring %r for mounts', _root) + _jobs.kill() + _connects.clear() + _found.clear() def connect(filename, found_cb, lost_cb): + if filename in _connects: + return _connects[filename] = (found_cb, lost_cb) + for path, filenames in _found.items(): + if exists(join(path, filename)): + filenames.add(filename) + _call(path, filename, 0) + +def _found_mount(path): + _found.setdefault(path, set()) + found = _found[path] + for filename in _connects: + if filename in found or not exists(join(path, filename)): + continue + found.add(filename) + _call(path, filename, 0) -def _found_mount(filename): - if filename not in _connects or filename in _found: + +def _lost_mount(path): + if path not in _found: return - found_cb, __ = _connects[filename] - path = join(_root, filename) - try: - found_cb(path) - except Exception: - util.exception(_logger, 'Cannot process %r mount', path) + for filename in _found.pop(path): + _call(path, filename, 1) -def _lost_mount(filename): - if filename not in _found: +def _call(path, filename, cb): + cb = _connects[filename][cb] + if cb is None: return - __, lost_cb = _connects[filename] - path = join(_root, filename) + _logger.debug('Call %r for %r mount', cb, path) try: - lost_cb(path) + cb(path) except Exception: - util.exception(_logger, 'Cannot process %r unmount', path) + util.exception(_logger, 'Cannot call %r for %r mount', cb, path) def _monitor(): - _logger.info('Start monitoring %r for mounts', root) - with Inotify() as monitor: - monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE | + monitor.add_watch(_root, IN_DELETE_SELF | IN_CREATE | IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM) while not monitor.closed: coroutine.select([monitor.fileno()], [], []) - for filename, event, __ in monitor.read(): + for name, event, __ in monitor.read(): + path = join(_root, name) if event & IN_DELETE_SELF: - _logger.warning('Lost %r, cannot monitor anymore', - root) + _logger.warning('Lost %r, cannot monitor anymore', _root) monitor.close() break elif event & (IN_DELETE | IN_MOVED_FROM): - _lost_mount(filename) + _lost_mount(path) elif event & (IN_CREATE | IN_MOVED_TO): - # Right after moutning, access to directory - # might be restricted; let system enough time - # to complete mounting + # Right after moutning, access to newly mounted directory + # might be restricted; let the system enough time + # to complete mounting routines coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT) - _found_mount(filename) + _found_mount(path) diff --git a/tests/__init__.py b/tests/__init__.py index c3f0b40..f5dfad5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -18,7 +18,7 @@ from M2Crypto import DSA import active_document as ad from active_toolkit import coroutine from sugar_network.client import bus -from sugar_network.toolkit import sugar, http, sneakernet +from sugar_network.toolkit import sugar, http, sneakernet, mounts_monitor from sugar_network.local.bus import IPCServer from sugar_network.local.mounts import HomeMount, RemoteMount from sugar_network.local.mountset import Mountset @@ -74,12 +74,15 @@ class Test(unittest.TestCase): node.tmpdir.value = tmpdir + '/tmp' node.only_commit_events.value = False node.data_root.value = tmpdir + node.sync_dirs.value = [] ad.index_write_queue.value = 10 local.local_root.value = tmpdir local.activity_dirs.value = [tmpdir + '/Activities'] local.api_url.value = 'http://localhost:8800' local.server_mode.value = False local.mounts_root.value = None + mounts_monitor.stop() + mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1 Volume.RESOURCES = [ 'sugar_network.resources.user', diff --git a/tests/units/dbus_client.py b/tests/units/dbus_client.py index 8d9813e..04b2307 100755 --- a/tests/units/dbus_client.py +++ b/tests/units/dbus_client.py @@ -33,7 +33,7 @@ class DbusClientTest(tests.Test): return self.fork(os.execvp, arg0, [arg0, self.id().split('.')[-1], 'fork']) - coroutine.sleep(1) + coroutine.sleep(3) def test_Call(self): client = DBusClient(mountpoint='~') diff --git a/tests/units/mounts_monitor.py b/tests/units/mounts_monitor.py index 3ff5852..bc11f16 100755 --- a/tests/units/mounts_monitor.py +++ b/tests/units/mounts_monitor.py @@ -2,6 +2,7 @@ # sugar-lint: disable import os +import shutil from __init__ import tests @@ -13,12 +14,14 @@ class MountsMonitorTest(tests.Test): def setUp(self): tests.Test.setUp(self) - mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 0 + mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 0.01 def test_Populate(self): - self.touch('mnt/foo') - self.touch('mnt/bar') - self.touch('mnt/fake') + self.touch('mnt/1/foo') + self.touch('mnt/2/foo') + self.touch('mnt/2/bar') + self.touch('mnt/3/fake') + os.makedirs('mnt/4') mounts_monitor.start('mnt') @@ -26,8 +29,8 @@ class MountsMonitorTest(tests.Test): mounts_monitor.connect('foo', found.append, None) mounts_monitor.connect('bar', found.append, None) self.assertEqual( - ['mnt/foo', 'mnt/bar'], - found) + sorted(['mnt/1', 'mnt/2', 'mnt/2']), + sorted(found)) def test_Found(self): os.makedirs('mnt') @@ -38,14 +41,16 @@ class MountsMonitorTest(tests.Test): mounts_monitor.connect('bar', found.append, None) coroutine.dispatch() - self.touch('mnt/foo') - self.touch('mnt/bar') - self.touch('mnt/fake') + self.touch('mnt/1/foo') + self.touch('mnt/2/foo') + self.touch('mnt/2/bar') + self.touch('mnt/3/fake') + os.makedirs('mnt/4') coroutine.sleep(.5) self.assertEqual( - ['mnt/foo', 'mnt/bar'], - found) + sorted(['mnt/1', 'mnt/2', 'mnt/2']), + sorted(found)) def test_Lost(self): os.makedirs('mnt') @@ -57,20 +62,24 @@ class MountsMonitorTest(tests.Test): mounts_monitor.connect('bar', found.append, lost.append) coroutine.dispatch() - self.touch('mnt/foo') - self.touch('mnt/bar') - self.touch('mnt/fake') - os.unlink('mnt/foo') - os.unlink('mnt/bar') - os.unlink('mnt/fake') - coroutine.sleep(.5) + self.touch('mnt/1/foo') + self.touch('mnt/2/foo') + self.touch('mnt/2/bar') + self.touch('mnt/3/fake') + os.makedirs('mnt/4') + coroutine.sleep(.1) + shutil.rmtree('mnt/1') + shutil.rmtree('mnt/2') + shutil.rmtree('mnt/3') + shutil.rmtree('mnt/4') + coroutine.sleep(.1) self.assertEqual( - ['mnt/foo', 'mnt/bar'], - found) + sorted(['mnt/1', 'mnt/2', 'mnt/2']), + sorted(found)) self.assertEqual( - ['mnt/foo', 'mnt/bar'], - lost) + sorted(['mnt/1', 'mnt/2', 'mnt/2']), + sorted(lost)) def test_FoundTimeout(self): mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 2 @@ -81,11 +90,11 @@ class MountsMonitorTest(tests.Test): mounts_monitor.connect('probe', found.append, None) coroutine.dispatch() - self.touch('mnt/probe') + self.touch('mnt/1/probe') coroutine.sleep(1) self.assertEqual([], found) coroutine.sleep(1.5) - self.assertEqual(['mnt/probe'], found) + self.assertEqual(['mnt/1'], found) if __name__ == '__main__': diff --git a/tests/units/mountset.py b/tests/units/mountset.py index ba00908..1073e7c 100755 --- a/tests/units/mountset.py +++ b/tests/units/mountset.py @@ -15,7 +15,7 @@ from sugar_network.local.mountset import Mountset from sugar_network.local.bus import IPCServer from sugar_network.resources.user import User from sugar_network.resources.context import Context -from sugar_network.toolkit import http +from sugar_network.toolkit import http, mounts_monitor from sugar_network import local, Client, ServerError, sugar, node from sugar_network.resources.volume import Volume from sugar_network.local.mounts import HomeMount, RemoteMount @@ -47,6 +47,7 @@ class MountsetTest(tests.Test): mounts.open() mounts.opened.wait() + mounts_monitor.start(tests.tmpdir) # Let `open()` start processing spawned jobs coroutine.dispatch() diff --git a/tests/units/node_mount.py b/tests/units/node_mount.py index 04c54ab..8aa0043 100755 --- a/tests/units/node_mount.py +++ b/tests/units/node_mount.py @@ -15,6 +15,7 @@ from sugar_network.local.mounts import HomeMount from sugar_network.local.mountset import Mountset from sugar_network.local.bus import IPCServer from sugar_network.local import activities +from sugar_network.toolkit import mounts_monitor from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network import local, Client, sugar @@ -53,6 +54,7 @@ class NodeMountTest(tests.Test): Client.connect(events_cb) mounts.open() + mounts_monitor.start(tests.tmpdir) mounts.opened.wait() # Let `open()` start processing spawned jobs coroutine.dispatch() diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py index bff7ce2..a1d9295 100755 --- a/tests/units/sync_master.py +++ b/tests/units/sync_master.py @@ -11,6 +11,7 @@ from __init__ import tests import active_document as ad from active_document.directory import Directory +from sugar_network import node from sugar_network.toolkit.sneakernet import InPacket, OutPacket, OutBufferPacket from sugar_network.toolkit.files_sync import Seeder from sugar_network.node import sync_master @@ -572,8 +573,9 @@ class SyncMasterTest(tests.Test): [i for i in packet]) def test_pull_ProcessFilePulls(self): + node.sync_dirs.value = ['files'] seqno = ad.Seqno('seqno') - master = MasterCommands('master', sync_dirs=['files']) + master = MasterCommands('master') request = Request() response = ad.Response() @@ -700,15 +702,15 @@ class Request(ad.Request): self.environ = environ or {} -class MasterCommands(sync_master.Commands): +class MasterCommands(sync_master.SyncCommands): def __init__(self, master, **kwargs): os.makedirs('db') with file('db/master', 'w') as f: f.write(master) - sync_master.Commands._guid = master - sync_master.Commands.volume = new_volume('db') - sync_master.Commands.__init__(self, **kwargs) + sync_master.SyncCommands._guid = master + sync_master.SyncCommands.volume = new_volume('db') + sync_master.SyncCommands.__init__(self, **kwargs) def new_volume(root): diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py index fc9a049..d943066 100755 --- a/tests/units/sync_node.py +++ b/tests/units/sync_node.py @@ -9,7 +9,8 @@ from __init__ import tests import active_document as ad from sugar_network.toolkit.sneakernet import InPacket, OutFilePacket -from sugar_network.local import mounts, api_url +from sugar_network.local import api_url +from sugar_network.node import sync_node from sugar_network.toolkit import sneakernet from sugar_network.resources.volume import Volume from active_toolkit import coroutine @@ -27,12 +28,12 @@ class SyncNodeTest(tests.Test): return str(self.uuid) def test_Export(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1', prop='value1') node.volume['document'].create(guid='2', prop='value2') - node.sync('sync') + node.sync('mnt') self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, None]]}, {'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': '1', 'diff': { @@ -53,14 +54,14 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, 2]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) def test_Export_NoPullForExistingSession(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1') - node.sync('sync', session='session') + node.sync('mnt', session='session') self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 'session', 'diff': { 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, @@ -72,10 +73,10 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 'session', 'sequence': [[1, 1]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) def test_LimittedExport(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1', prop='*' * 1024) node.volume['document'].create(guid='2', prop='*' * 1024) @@ -84,16 +85,16 @@ class SyncNodeTest(tests.Test): node.volume['document'].create(guid='5', prop='*' * 1024) node.volume['document'].create(guid='6', prop='*' * 1024) - kwargs = node.sync('sync', accept_length=1024, session=0) + kwargs = node.sync('mnt', accept_length=1024, session=0) self.assertEqual(0, kwargs['session']) self.assertEqual([[1, None]], kwargs['push_sequence']) - self.assertEqual([], self.read_packets('sync')) + self.assertEqual([], self.read_packets('mnt')) - kwargs = node.sync('sync', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0) + kwargs = node.sync('mnt', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0) self.assertEqual([[1, None]], kwargs['push_sequence']) - self.assertEqual([], self.read_packets('sync')) + self.assertEqual([], self.read_packets('mnt')) - kwargs = node.sync('sync', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1) + kwargs = node.sync('mnt', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1) self.assertEqual([[2, None]], kwargs['push_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': { @@ -106,9 +107,9 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 1, 'sequence': [[1, 1]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) - kwargs = node.sync('sync', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2) + kwargs = node.sync('mnt', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2) self.assertEqual([[4, None]], kwargs['push_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': { @@ -129,9 +130,9 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 2, 'sequence': [[2, 3]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) - kwargs = node.sync('sync', push_sequence=kwargs['push_sequence'], session=3) + kwargs = node.sync('mnt', push_sequence=kwargs['push_sequence'], session=3) self.assertEqual(None, kwargs) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': { @@ -160,12 +161,12 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 3, 'sequence': [[4, 6]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) def test_Import(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') - master_packet = OutFilePacket('sync', src='master') + master_packet = OutFilePacket('mnt', src='master') master_packet.push(data=[ {'cmd': 'sn_ack', 'dst': 'node', 'sequence': [[1, 2]], 'merged': [[3, 4]]}, {'cmd': 'sn_ack', 'dst': 'other', 'sequence': [[5, 6]], 'merged': [[7, 8]]}, @@ -174,21 +175,21 @@ class SyncNodeTest(tests.Test): ]) master_packet.close() - our_packet = OutFilePacket('sync', src='node', dst='master', session='stale') + our_packet = OutFilePacket('mnt', src='node', dst='master', session='stale') our_packet.push(data=[ {'cmd': 'sn_push', 'document': 'document', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}}, {'cmd': 'sn_commit', 'sequence': [[9, 10]]}, ]) our_packet.close() - other_node_packet = OutFilePacket('sync', src='other', dst='master') + other_node_packet = OutFilePacket('mnt', src='other', dst='master') other_node_packet.push(data=[ {'cmd': 'sn_push', 'document': 'document', 'guid': '3', 'diff': {'guid': {'value': '3', 'mtime': 3}}}, {'cmd': 'sn_commit', 'sequence': [[13, 14]]}, ]) other_node_packet.close() - node.sync('sync', session='new') + node.sync('mnt', session='new') assert exists(master_packet.path) assert exists(other_node_packet.path) @@ -196,28 +197,28 @@ class SyncNodeTest(tests.Test): self.assertEqual( [[3, None]], - json.load(file('db/push.sequence'))) + json.load(file('sync/push'))) self.assertEqual( [[1, 2], [5, 10], [13, None]], - json.load(file('db/pull.sequence'))) + json.load(file('sync/pull'))) self.assertEqual( ['2', '3'], [i.guid for i in node.volume['document'].find()[0]]) def test_TakeIntoAccountJustReadAckPacket(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1', prop='prop1') node.volume['document'].create(guid='2', prop='prop2') node.volume['document'].create(guid='3', prop='prop3') - master_packet = OutFilePacket('sync', src='master') + master_packet = OutFilePacket('mnt', src='master') master_packet.push(data=[ {'cmd': 'sn_ack', 'dst': 'node', 'sequence': [[1, 2]], 'merged': []}, ]) master_packet.close() - node.sync('sync', session='session') + node.sync('mnt', session='session') self.assertEqual([ {'filename': 'master.packet', 'content_type': 'records', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 2]], 'merged': []}, @@ -231,74 +232,73 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-4.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 'session', 'sequence': [[3, 3]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) def test_Import_DoNotDeletePacketsFromCurrentSession(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1') - existing_session = OutFilePacket('sync', src='node', dst='master', session='the same') + existing_session = OutFilePacket('mnt', src='node', dst='master', session='the same') existing_session.push(data=[ {'cmd': 'sn_push', 'document': 'document', 'range': [1, 1], 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}}, ]) existing_session.close() - self.assertEqual(1, len([i for i in sneakernet.walk('sync')])) - node.sync('sync', session='the same') + self.assertEqual(1, len([i for i in sneakernet.walk('mnt')])) + node.sync('mnt', session='the same') self.assertEqual( ['the same', 'the same'], - [i.header['session'] for i in sneakernet.walk('sync')]) + [i.header['session'] for i in sneakernet.walk('mnt')]) assert exists(existing_session.path) - node.sync('sync', session='new one') + node.sync('mnt', session='new one') self.assertEqual( ['new one'], - [i.header['session'] for i in sneakernet.walk('sync')]) + [i.header['session'] for i in sneakernet.walk('mnt')]) def test_sync_session(self): - node = NodeMount('node', 'master') + node = SyncCommands('node', 'master') node.volume['document'].create(guid='1', prop='*' * 1024) coroutine.dispatch() - node.publisher = lambda x: events.append(x) self.override(os, 'statvfs', lambda x: Statvfs(1024 - 512)) - events = [] - node.sync_session(['sync']) + node.events = [] + node.sync_session('mnt') self.assertEqual([ - {'event': 'sync_start', 'path': 'sync'}, + {'event': 'sync_start', 'path': 'mnt'}, {'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"}, {'event': 'sync_continue'}, ], - events) - records = self.read_packets('sync') + node.events) + records = self.read_packets('mnt') self.assertEqual(1, len(records)) self.assertEqual('sn_pull', records[0]['cmd']) session = records[0]['session'] - events = [] - node.sync_session(['sync']) + node.events = [] + node.sync_session('mnt') self.assertEqual([ - {'path': 'sync', 'event': 'sync_start'}, + {'path': 'mnt', 'event': 'sync_start'}, {'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"}, {'event': 'sync_continue'}, ], - events) + node.events) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) self.override(os, 'statvfs', lambda x: Statvfs(1024 + 512)) - events = [] - node.sync_session(['sync']) + node.events = [] + node.sync_session('mnt') self.assertEqual([ - {'path': 'sync', 'event': 'sync_start'}, + {'path': 'mnt', 'event': 'sync_start'}, {'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"}, {'event': 'sync_complete'}, ], - events) + node.events) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]}, {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': session, 'diff': { @@ -311,7 +311,7 @@ class SyncNodeTest(tests.Test): }}, {'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, 1]]}, ], - self.read_packets('sync')) + self.read_packets('mnt')) def read_packets(self, path): result = [] @@ -330,15 +330,18 @@ class Statvfs(object): self.f_bfree = f_bfree -class NodeMount(mounts.NodeMount): +class SyncCommands(sync_node.SyncCommands): def __init__(self, node, master): - os.makedirs('db') - with file('db/node', 'w') as f: - f.write(node) - with file('db/master', 'w') as f: - f.write(master) - mounts.NodeMount.__init__(self, new_volume('db'), None) + sync_node.SyncCommands.__init__(self, 'sync') + self.node_guid = node + self.master_guid = master + self.volume = new_volume('db') + self.node_mount = self + self.events = [] + + def publish(self, event): + self.events.append(event) def new_volume(root): |