Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-08-08 12:34:17 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-08-08 12:34:17 (GMT)
commitc8ce1a074715d809dd3ec435978113e5b0c30ed7 (patch)
treec391a1064d9103fa94872319a5332fc7a9dc09b3
parente297a16dce2c17a6ca6b1af5257008365b201b0f (diff)
Keep node sync code out of local/
-rwxr-xr-xsugar-network-server2
-rwxr-xr-xsugar-network-service9
-rw-r--r--sugar_network/local/mounts.py136
-rw-r--r--sugar_network/local/mountset.py140
-rw-r--r--sugar_network/node/commands.py8
-rw-r--r--sugar_network/node/sync_master.py8
-rw-r--r--sugar_network/node/sync_node.py212
-rw-r--r--sugar_network/toolkit/files_sync.py4
-rw-r--r--sugar_network/toolkit/mounts_monitor.py76
-rw-r--r--tests/__init__.py5
-rwxr-xr-xtests/units/dbus_client.py2
-rwxr-xr-xtests/units/mounts_monitor.py57
-rwxr-xr-xtests/units/mountset.py3
-rwxr-xr-xtests/units/node_mount.py2
-rwxr-xr-xtests/units/sync_master.py12
-rwxr-xr-xtests/units/sync_node.py123
16 files changed, 424 insertions, 375 deletions
diff --git a/sugar-network-server b/sugar-network-server
index 928fe92..1878faf 100755
--- a/sugar-network-server
+++ b/sugar-network-server
@@ -55,7 +55,7 @@ class Application(application.Daemon):
self.jobs.spawn(volume.populate)
subscriber = SubscribeSocket(volume,
node.host.value, node.subscribe_port.value)
- cp = MasterCommands(volume, subscriber, node.sync_dirs.value)
+ cp = MasterCommands(volume, subscriber)
logging.info('Listening for requests on %s:%s',
node.host.value, node.port.value)
diff --git a/sugar-network-service b/sugar-network-service
index 4e2b18c..25bcb55 100755
--- a/sugar-network-service
+++ b/sugar-network-service
@@ -26,7 +26,7 @@ import sugar_network_webui as webui
from sugar_network import local, node, Client
from sugar_network.toolkit import sugar, sneakernet
-from sugar_network.toolkit import crypto, dbus_thread
+from sugar_network.toolkit import crypto, dbus_thread, mounts_monitor
from sugar_network.local import activities, datastore
from sugar_network.local.dbus_datastore import Datastore
from sugar_network.local.dbus_network import Network
@@ -71,7 +71,6 @@ class Application(application.Application):
new_root = (local.local_root.value != local.local_root.default)
local.local_root.value = abspath(local.local_root.value)
- local.mounts_root.value = abspath(local.mounts_root.value)
if new_root:
application.logdir.value = join(local.local_root.value, 'log')
@@ -151,7 +150,7 @@ class Application(application.Application):
jobs = coroutine.Pool()
volume = Volume(self._db_path, lazy_open=local.lazy_open.value)
- mountset = Mountset(volume, node.sync_dirs.value)
+ mountset = Mountset(volume)
mountset['~'] = HomeMount(volume)
mountset['/'] = RemoteMount(volume)
@@ -182,6 +181,9 @@ class Application(application.Application):
server = coroutine.WSGIServer(host, webui.get_app())
jobs.spawn(server.serve_forever)
+ if local.mounts_root.value:
+ mounts_monitor.start(abspath(local.mounts_root.value))
+
if local.delayed_start.value:
mountset.connect(delayed_start, event='delayed-start')
else:
@@ -200,6 +202,7 @@ class Application(application.Application):
except Exception:
util.exception('%s aborted', self.name)
finally:
+ mounts_monitor.stop()
jobs.kill()
mountset.close()
diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py
index 57ae353..6d75740 100644
--- a/sugar_network/local/mounts.py
+++ b/sugar_network/local/mounts.py
@@ -14,20 +14,17 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
-import sys
import json
import shutil
import logging
from urlparse import urlparse
-from os.path import isabs, exists, join, basename, dirname
+from os.path import isabs, exists, join, basename
from gettext import gettext as _
import sweets_recipe
import active_document as ad
from sweets_recipe import Bundle
-from sugar_network.toolkit.collection import Sequence, PersistentSequence
-from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull
-from sugar_network.toolkit import sugar, http, sneakernet
+from sugar_network.toolkit import sugar, http
from sugar_network.local import activities, cache
from sugar_network import local, checkin, sugar
from active_toolkit import sockets, util, coroutine, enforce
@@ -416,23 +413,23 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
class NodeMount(LocalMount, _ProxyCommands):
- def __init__(self, volume, home_volume, file_syncs=None):
+ def __init__(self, volume, home_volume):
LocalMount.__init__(self, volume)
_ProxyCommands.__init__(self, home_volume)
- self._file_syncs = file_syncs or {}
- self._push_seq = PersistentSequence(
- join(volume.root, 'push.sequence'), [1, None])
- self._pull_seq = PersistentSequence(
- join(volume.root, 'pull.sequence'), [1, None])
- self._sync_session = None
- self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
-
with file(join(volume.root, 'node')) as f:
self._node_guid = f.read().strip()
with file(join(volume.root, 'master')) as f:
self._master_guid = f.read().strip()
+ @property
+ def node_guid(self):
+ return self._node_guid
+
+ @property
+ def master_guid(self):
+ return self._master_guid
+
def call(self, request, response):
return self._proxy_call(request, response, super(NodeMount, self).call)
@@ -453,114 +450,3 @@ class NodeMount(LocalMount, _ProxyCommands):
self._node_guid, extract)
return meta
-
- def sync(self, path, accept_length=None, push_sequence=None, session=None):
- to_push_seq = Sequence(empty_value=[1, None])
- if push_sequence is None:
- to_push_seq.include(self._push_seq)
- else:
- to_push_seq = Sequence(push_sequence)
-
- if session is None:
- session_is_new = True
- session = ad.uuid()
- else:
- session_is_new = False
-
- while True:
- for packet in sneakernet.walk(path):
- if packet.header.get('src') == self._node_guid:
- if packet.header.get('session') == session:
- _logger.debug('Keep current session %r packet', packet)
- else:
- _logger.debug('Remove our previous %r packet', packet)
- os.unlink(packet.path)
- else:
- self._import(packet, to_push_seq)
- self._push_seq.commit()
- self._pull_seq.commit()
-
- if exists(self._sync_script):
- shutil.copy(self._sync_script, path)
-
- with OutFilePacket(path, limit=accept_length, src=self._node_guid,
- dst=self._master_guid, session=session,
- seqno=self.volume.seqno.value,
- api_url=local.api_url.value) as packet:
- if session_is_new:
- for directory, sync in self._file_syncs.items():
- packet.push(cmd='files_pull', directory=directory,
- sequence=sync.sequence)
- packet.push(cmd='sn_pull', sequence=self._pull_seq)
-
- _logger.debug('Generating %r PUSH packet to %r',
- packet, packet.path)
- self.publish({
- 'event': 'sync_progress',
- 'progress': _('Generating %r packet') % packet.basename,
- })
-
- try:
- self.volume.diff(to_push_seq, packet)
- except DiskFull:
- return {'push_sequence': to_push_seq, 'session': session}
- else:
- break
-
- def sync_session(self, mounts, path=None):
- _logger.debug('Start synchronization session with %r session '
- 'for %r mounts', self._sync_session, mounts)
-
- def sync(path):
- self.publish({'event': 'sync_start', 'path': path})
- self._sync_session = self.sync(path, **(self._sync_session or {}))
- return self._sync_session is None
-
- try:
- while True:
- if path and sync(path):
- break
- for mountpoint in mounts:
- if sync(mountpoint):
- break
- break
- except Exception, error:
- util.exception(_logger, 'Failed to complete synchronization')
- self.publish({'event': 'sync_error', 'error': str(error)})
- self._sync_session = None
-
- if self._sync_session is None:
- _logger.debug('Synchronization completed')
- self.publish({'event': 'sync_complete'})
- else:
- _logger.debug('Postpone synchronization with %r session',
- self._sync_session)
- self.publish({'event': 'sync_continue'})
-
- def _import(self, packet, to_push_seq):
- self.publish({
- 'event': 'sync_progress',
- 'progress': _('Reading %r packet') % basename(packet.path),
- })
- _logger.debug('Processing %r PUSH packet from %r', packet, packet.path)
-
- from_master = (packet.header.get('src') == self._master_guid)
-
- for record in packet.records():
- cmd = record.get('cmd')
- if cmd == 'sn_push':
- self.volume.merge(record, increment_seqno=False)
- elif from_master:
- if cmd == 'sn_commit':
- _logger.debug('Processing %r COMMIT from %r',
- record, packet)
- self._pull_seq.exclude(record['sequence'])
- elif cmd == 'sn_ack' and record['dst'] == self._node_guid:
- _logger.debug('Processing %r ACK from %r', record, packet)
- self._push_seq.exclude(record['sequence'])
- self._pull_seq.exclude(record['merged'])
- to_push_seq.exclude(record['sequence'])
- self.volume.seqno.next()
- self.volume.seqno.commit()
- elif record.get('directory') in self._file_syncs:
- self._file_syncs[record['directory']].push(record)
diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py
index ae622a3..e9e081e 100644
--- a/sugar_network/local/mountset.py
+++ b/sugar_network/local/mountset.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import locale
import socket
import logging
@@ -21,48 +20,35 @@ from os.path import join, exists
import active_document as ad
-from sugar_network.toolkit.inotify import Inotify, \
- IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM
from sugar_network import local, node
-from sugar_network.toolkit import zeroconf, netlink, network
-from sugar_network.toolkit.collection import MutableStack
-from sugar_network.toolkit.files_sync import Leechers
+from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor
from sugar_network.local.mounts import LocalMount, NodeMount
from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.node.commands import NodeCommands
+from sugar_network.node.sync_node import SyncCommands
from sugar_network.node.router import Router
from sugar_network.resources.volume import Volume
from active_toolkit import util, coroutine, enforce
_DB_DIRNAME = '.sugar-network'
-_SYNC_DIRNAME = '.sugar-network-sync'
-
-_COMPLETE_MOUNT_TIMEOUT = 3
_logger = logging.getLogger('local.mountset')
-class Mountset(dict, ad.CommandsProcessor):
+class Mountset(dict, ad.CommandsProcessor, SyncCommands):
- def __init__(self, home_volume, sync_dirs=None):
+ def __init__(self, home_volume):
dict.__init__(self)
ad.CommandsProcessor.__init__(self)
+ SyncCommands.__init__(self, local.path('sync'))
self.opened = coroutine.Event()
-
self.home_volume = home_volume
- if sync_dirs is None:
- self._file_syncs = {}
- else:
- self._file_syncs = Leechers(sync_dirs,
- join(home_volume.root, 'files'))
self._subscriptions = {}
self._locale = locale.getdefaultlocale()[0].replace('_', '-')
self._jobs = coroutine.Pool()
self._servers = coroutine.Pool()
- self._sync_dirs = MutableStack()
- self._sync = coroutine.Pool()
def __getitem__(self, mountpoint):
enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint)
@@ -100,26 +86,6 @@ class Mountset(dict, ad.CommandsProcessor):
mount.set_mounted(True)
return mount.mounted
- @ad.volume_command(method='POST', cmd='start_sync')
- def start_sync(self, rewind=False, path=None):
- if self._sync:
- return
-
- enforce(path or self._sync_dirs, 'No mounts to synchronize with')
-
- for mount in self.values():
- if isinstance(mount, NodeMount):
- if rewind:
- self._sync_dirs.rewind()
- self._sync.spawn(mount.sync_session, self._sync_dirs, path)
- break
- else:
- raise RuntimeError('No mounted servers')
-
- @ad.volume_command(method='POST', cmd='break_sync')
- def break_sync(self):
- self._sync.kill()
-
@ad.volume_command(method='POST', cmd='publish')
def republish(self, request):
self.publish(request.content)
@@ -128,28 +94,24 @@ class Mountset(dict, ad.CommandsProcessor):
if response is None:
response = ad.Response()
request.accept_language = [self._locale]
- mountpoint = None
+ mountpoint = request.get('mountpoint')
- def process_call():
+ try:
try:
- return ad.CommandsProcessor.call(self, request, response)
+ result = ad.CommandsProcessor.call(self, request, response)
except ad.CommandNotFound:
- mountpoint = request.pop('mountpoint')
+ request.pop('mountpoint')
mount = self[mountpoint]
if mountpoint == '/':
mount.set_mounted(True)
enforce(mount.mounted, '%r is not mounted', mountpoint)
- return mount.call(request, response)
-
- try:
- result = process_call()
+ result = mount.call(request, response)
except Exception:
- util.exception(_logger,
- 'Failed to call %s on %r', request, mountpoint)
+ util.exception(_logger, 'Failed to call %s on %r',
+ request, mountpoint)
raise
- else:
- _logger.debug('Called %s on %r: %r', request, mountpoint, result)
+ _logger.debug('Called %s on %r: %r', request, mountpoint, result)
return result
def connect(self, callback, condition=None, **kwargs):
@@ -172,16 +134,8 @@ class Mountset(dict, ad.CommandsProcessor):
def open(self):
try:
- mounts_root = local.mounts_root.value
- if mounts_root:
- for filename in os.listdir(mounts_root):
- self._found_mount(join(mounts_root, filename))
- # In case if sync mounts processed before server mounts
- # TODO More obvious code
- for filename in os.listdir(mounts_root):
- self._found_mount(join(mounts_root, filename))
- self._jobs.spawn(self._mounts_monitor)
-
+ mounts_monitor.connect(_DB_DIRNAME,
+ self._found_mount, self._lost_mount)
if '/' in self:
if local.api_url.value:
crawler = self._wait_for_master
@@ -217,61 +171,23 @@ class Mountset(dict, ad.CommandsProcessor):
# Otherwise, `socket.gethostbyname()` will return stale resolve
network.res_init()
- def _mounts_monitor(self):
- root = local.mounts_root.value
- _logger.info('Start monitoring %r for mounts', root)
-
- with Inotify() as monitor:
- monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE |
- IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM)
- while not monitor.closed:
- coroutine.select([monitor.fileno()], [], [])
- for filename, event, __ in monitor.read():
- path = join(root, filename)
- try:
- if event & IN_DELETE_SELF:
- _logger.warning('Lost %r, cannot monitor anymore',
- root)
- monitor.close()
- break
- elif event & (IN_DELETE | IN_MOVED_FROM):
- self._lost_mount(path)
- elif event & (IN_CREATE | IN_MOVED_TO):
- # Right after moutning, access to directory
- # might be restricted; let system enough time
- # to complete mounting
- coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT)
- self._found_mount(path)
- except Exception:
- util.exception(_logger, 'Mount %r failed', path)
-
def _found_mount(self, path):
- if exists(join(path, _DB_DIRNAME)) and path not in self:
- _logger.debug('Found %r server mount', path)
- volume, server_mode = self._mount_volume(path)
- if server_mode:
- self[path] = NodeMount(volume, self.home_volume,
- self._file_syncs)
- else:
- self[path] = LocalMount(volume)
-
- if exists(join(path, _SYNC_DIRNAME)):
- self._sync_dirs.add(path)
- if self._servers:
- _logger.debug('Found %r sync mount', path)
- self.start_sync()
- else:
- _logger.debug('Found %r sync mount but no servers', path)
+ volume, server_mode = self._mount_volume(path)
+ if server_mode:
+ _logger.debug('Mount %r in node mode', path)
+ self[path] = self.node_mount = NodeMount(volume, self.home_volume)
+ else:
+ _logger.debug('Mount %r in node-less mode', path)
+ self[path] = LocalMount(volume)
def _lost_mount(self, path):
+ mount = self.get(path)
+ if mount is None:
+ return
_logger.debug('Lost %r mount', path)
-
- self._sync_dirs.remove(join(path, _SYNC_DIRNAME))
- if not self._sync_dirs:
- self.break_sync()
-
- if path in self:
- del self[path]
+ if isinstance(mount, NodeMount):
+ self.node_mount = None
+ del self[path]
def _mount_volume(self, path):
lazy_open = local.lazy_open.value
diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py
index 29d721b..617ab63 100644
--- a/sugar_network/node/commands.py
+++ b/sugar_network/node/commands.py
@@ -20,7 +20,7 @@ from os.path import exists, join
import active_document as ad
from sugar_network import node
-from sugar_network.node import sync_master
+from sugar_network.node.sync_master import SyncCommands
from active_toolkit import util, enforce
@@ -137,11 +137,11 @@ class NodeCommands(ad.VolumeCommands):
props['author'] = authors
-class MasterCommands(NodeCommands, sync_master.Commands):
+class MasterCommands(NodeCommands, SyncCommands):
- def __init__(self, volume, subscriber=None, sync_dirs=None):
+ def __init__(self, volume, subscriber=None):
NodeCommands.__init__(self, volume, subscriber)
- sync_master.Commands.__init__(self, sync_dirs)
+ SyncCommands.__init__(self)
def _load_pubkey(pubkey):
diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py
index 2d4fcb1..c1016a0 100644
--- a/sugar_network/node/sync_master.py
+++ b/sugar_network/node/sync_master.py
@@ -37,14 +37,14 @@ _PULL_QUEUE_SIZE = 256
_logger = logging.getLogger('node.sync_master')
-class Commands(object):
+class SyncCommands(object):
_guid = None
volume = None
- def __init__(self, sync_dirs=None):
- self._file_syncs = Seeders(sync_dirs,
- join(node.data_root.value, 'files'), self.volume.seqno)
+ def __init__(self):
+ self._file_syncs = Seeders(node.sync_dirs.value,
+ join(node.data_root.value, 'sync'), self.volume.seqno)
self._pull_queue = lrucache(_PULL_QUEUE_SIZE,
lambda key, pull: pull.unlink())
diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py
new file mode 100644
index 0000000..c887ccd
--- /dev/null
+++ b/sugar_network/node/sync_node.py
@@ -0,0 +1,212 @@
+# Copyright (C) 2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import shutil
+import logging
+from os.path import join, dirname, exists, basename
+from gettext import gettext as _
+
+import active_document as ad
+from sugar_network import node, local
+from sugar_network.toolkit import mounts_monitor, sneakernet
+from sugar_network.toolkit.collection import MutableStack
+from sugar_network.toolkit.files_sync import Leechers
+from sugar_network.toolkit.collection import Sequence, PersistentSequence
+from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull
+from active_toolkit import coroutine, util, enforce
+
+
+_SYNC_DIRNAME = '.sugar-network-sync'
+
+_logger = logging.getLogger('node.sync_node')
+
+
+class SyncCommands(object):
+
+ def __init__(self, sequences_path):
+ self._sync = coroutine.Pool()
+ self._sync_mounts = MutableStack()
+ self._file_syncs = Leechers(node.sync_dirs.value, sequences_path)
+ self._sync_session = None
+ self._push_seq = PersistentSequence(
+ join(sequences_path, 'push'), [1, None])
+ self._pull_seq = PersistentSequence(
+ join(sequences_path, 'pull'), [1, None])
+ self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
+ self._mount = None
+
+ mounts_monitor.connect(_SYNC_DIRNAME,
+ self.__found_mountcb, self.__lost_mount_cb)
+
+ @property
+ def node_mount(self):
+ return self._mount
+
+ @node_mount.setter
+ def node_mount(self, value):
+ if self._mount is value:
+ return
+ self._mount = value
+ if self._sync_mounts:
+ self.start_sync()
+
+ @ad.volume_command(method='POST', cmd='start_sync')
+ def start_sync(self, rewind=False, path=None):
+ enforce(self._mount is not None, 'No server to sync')
+
+ if self._sync:
+ return
+
+ enforce(self._mount is not None, 'No server to synchronize')
+ enforce(path or self._sync_mounts, 'No mounts to synchronize with')
+
+ if rewind:
+ self._sync_mounts.rewind()
+ self._sync.spawn(self.sync_session, path)
+
+ @ad.volume_command(method='POST', cmd='break_sync')
+ def break_sync(self):
+ self._sync.kill()
+
+ def sync(self, path, accept_length=None, push_sequence=None, session=None):
+ enforce(self._mount is not None, 'No server to sync')
+
+ to_push_seq = Sequence(empty_value=[1, None])
+ if push_sequence is None:
+ to_push_seq.include(self._push_seq)
+ else:
+ to_push_seq = Sequence(push_sequence)
+
+ if session is None:
+ session_is_new = True
+ session = ad.uuid()
+ else:
+ session_is_new = False
+
+ while True:
+ for packet in sneakernet.walk(path):
+ if packet.header.get('src') == self._mount.node_guid:
+ if packet.header.get('session') == session:
+ _logger.debug('Keep current session %r packet', packet)
+ else:
+ _logger.debug('Remove our previous %r packet', packet)
+ os.unlink(packet.path)
+ else:
+ self._import(packet, to_push_seq)
+ self._push_seq.commit()
+ self._pull_seq.commit()
+
+ if exists(self._sync_script):
+ shutil.copy(self._sync_script, path)
+
+ with OutFilePacket(path, limit=accept_length,
+ src=self._mount.node_guid, dst=self._mount.master_guid,
+ session=session, seqno=self._mount.volume.seqno.value,
+ api_url=local.api_url.value) as packet:
+ if session_is_new:
+ for directory, sync in self._file_syncs.items():
+ packet.push(cmd='files_pull', directory=directory,
+ sequence=sync.sequence)
+ packet.push(cmd='sn_pull', sequence=self._pull_seq)
+
+ _logger.debug('Generating %r PUSH packet to %r',
+ packet, packet.path)
+ self._mount.publish({
+ 'event': 'sync_progress',
+ 'progress': _('Generating %r packet') % packet.basename,
+ })
+
+ try:
+ self._mount.volume.diff(to_push_seq, packet)
+ except DiskFull:
+ return {'push_sequence': to_push_seq, 'session': session}
+ else:
+ break
+
+ def sync_session(self, path=None):
+ enforce(self._mount is not None, 'No server to sync')
+
+ _logger.debug('Start synchronization session with %r session '
+ 'for %r mounts', self._sync_session, self._sync_mounts)
+
+ def sync(path):
+ self._mount.publish({'event': 'sync_start', 'path': path})
+ self._sync_session = self.sync(path, **(self._sync_session or {}))
+ return self._sync_session is None
+
+ try:
+ while True:
+ if path and sync(path):
+ break
+ for mountpoint in self._sync_mounts:
+ if sync(mountpoint):
+ break
+ break
+ except Exception, error:
+ util.exception(_logger, 'Failed to complete synchronization')
+ self._mount.publish({'event': 'sync_error', 'error': str(error)})
+ self._sync_session = None
+
+ if self._sync_session is None:
+ _logger.debug('Synchronization completed')
+ self._mount.publish({'event': 'sync_complete'})
+ else:
+ _logger.debug('Postpone synchronization with %r session',
+ self._sync_session)
+ self._mount.publish({'event': 'sync_continue'})
+
+ def _import(self, packet, to_push_seq):
+ self._mount.publish({
+ 'event': 'sync_progress',
+ 'progress': _('Reading %r packet') % basename(packet.path),
+ })
+ _logger.debug('Processing %r PUSH packet from %r', packet, packet.path)
+
+ from_master = (packet.header.get('src') == self._mount.master_guid)
+
+ for record in packet.records():
+ cmd = record.get('cmd')
+ if cmd == 'sn_push':
+ self._mount.volume.merge(record, increment_seqno=False)
+ elif from_master:
+ if cmd == 'sn_commit':
+ _logger.debug('Processing %r COMMIT from %r',
+ record, packet)
+ self._pull_seq.exclude(record['sequence'])
+ elif cmd == 'sn_ack' and \
+ record['dst'] == self._mount.node_guid:
+ _logger.debug('Processing %r ACK from %r', record, packet)
+ self._push_seq.exclude(record['sequence'])
+ self._pull_seq.exclude(record['merged'])
+ to_push_seq.exclude(record['sequence'])
+ self._mount.volume.seqno.next()
+ self._mount.volume.seqno.commit()
+ elif record.get('directory') in self._file_syncs:
+ self._file_syncs[record['directory']].push(record)
+
+ def __found_mountcb(self, path):
+ self._sync_mounts.add(path)
+ if self._mount is not None:
+ _logger.debug('Found %r sync mount', path)
+ self.start_sync()
+ else:
+ _logger.debug('Found %r sync mount but no servers', path)
+
+ def __lost_mount_cb(self, path):
+ self._sync_mounts.remove(join(path, _SYNC_DIRNAME))
+ if not self._sync_mounts:
+ self.break_sync()
diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py
index 271a6ff..088eb08 100644
--- a/sugar_network/toolkit/files_sync.py
+++ b/sugar_network/toolkit/files_sync.py
@@ -179,7 +179,7 @@ class Seeders(dict):
for path in sync_dirs or []:
name = basename(path)
- self[name] = Seeder(path, join(index_root, name), seqno)
+ self[name] = Seeder(path, join(index_root, name + '.files'), seqno)
class Leecher(object):
@@ -223,4 +223,4 @@ class Leechers(dict):
for path in sync_dirs or []:
name = basename(path)
- self[name] = Leecher(path, join(sequences_root, name))
+ self[name] = Leecher(path, join(sequences_root, name + '.files'))
diff --git a/sugar_network/toolkit/mounts_monitor.py b/sugar_network/toolkit/mounts_monitor.py
index 6c00203..f8507e0 100644
--- a/sugar_network/toolkit/mounts_monitor.py
+++ b/sugar_network/toolkit/mounts_monitor.py
@@ -15,6 +15,7 @@
import os
import logging
+from os.path import join, exists
from sugar_network.toolkit.inotify import Inotify, \
IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM
@@ -26,7 +27,7 @@ _COMPLETE_MOUNT_TIMEOUT = 3
_root = None
_jobs = coroutine.Pool()
_connects = {}
-_found = set()
+_found = {}
_logger = logging.getLogger('mounts_monitor')
@@ -37,63 +38,74 @@ def start(root):
return
_root = root
_logger.info('Start monitoring %r for mounts', _root)
- for filename in os.listdir(_root):
- _found_mount(filename)
+ for name in os.listdir(_root):
+ _found_mount(join(_root, name))
_jobs.spawn(_monitor)
def stop():
- if not _jobs:
- return
- _logger.info('Stop monitoring %r for mounts', _root)
- _jobs.kill()
+ if _jobs:
+ _logger.info('Stop monitoring %r for mounts', _root)
+ _jobs.kill()
+ _connects.clear()
+ _found.clear()
def connect(filename, found_cb, lost_cb):
+ if filename in _connects:
+ return
_connects[filename] = (found_cb, lost_cb)
+ for path, filenames in _found.items():
+ if exists(join(path, filename)):
+ filenames.add(filename)
+ _call(path, filename, 0)
+
+def _found_mount(path):
+ _found.setdefault(path, set())
+ found = _found[path]
+ for filename in _connects:
+ if filename in found or not exists(join(path, filename)):
+ continue
+ found.add(filename)
+ _call(path, filename, 0)
-def _found_mount(filename):
- if filename not in _connects or filename in _found:
+
+def _lost_mount(path):
+ if path not in _found:
return
- found_cb, __ = _connects[filename]
- path = join(_root, filename)
- try:
- found_cb(path)
- except Exception:
- util.exception(_logger, 'Cannot process %r mount', path)
+ for filename in _found.pop(path):
+ _call(path, filename, 1)
-def _lost_mount(filename):
- if filename not in _found:
+def _call(path, filename, cb):
+ cb = _connects[filename][cb]
+ if cb is None:
return
- __, lost_cb = _connects[filename]
- path = join(_root, filename)
+ _logger.debug('Call %r for %r mount', cb, path)
try:
- lost_cb(path)
+ cb(path)
except Exception:
- util.exception(_logger, 'Cannot process %r unmount', path)
+ util.exception(_logger, 'Cannot call %r for %r mount', cb, path)
def _monitor():
- _logger.info('Start monitoring %r for mounts', root)
-
with Inotify() as monitor:
- monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE |
+ monitor.add_watch(_root, IN_DELETE_SELF | IN_CREATE |
IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM)
while not monitor.closed:
coroutine.select([monitor.fileno()], [], [])
- for filename, event, __ in monitor.read():
+ for name, event, __ in monitor.read():
+ path = join(_root, name)
if event & IN_DELETE_SELF:
- _logger.warning('Lost %r, cannot monitor anymore',
- root)
+ _logger.warning('Lost %r, cannot monitor anymore', _root)
monitor.close()
break
elif event & (IN_DELETE | IN_MOVED_FROM):
- _lost_mount(filename)
+ _lost_mount(path)
elif event & (IN_CREATE | IN_MOVED_TO):
- # Right after moutning, access to directory
- # might be restricted; let system enough time
- # to complete mounting
+ # Right after moutning, access to newly mounted directory
+ # might be restricted; let the system enough time
+ # to complete mounting routines
coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT)
- _found_mount(filename)
+ _found_mount(path)
diff --git a/tests/__init__.py b/tests/__init__.py
index c3f0b40..f5dfad5 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -18,7 +18,7 @@ from M2Crypto import DSA
import active_document as ad
from active_toolkit import coroutine
from sugar_network.client import bus
-from sugar_network.toolkit import sugar, http, sneakernet
+from sugar_network.toolkit import sugar, http, sneakernet, mounts_monitor
from sugar_network.local.bus import IPCServer
from sugar_network.local.mounts import HomeMount, RemoteMount
from sugar_network.local.mountset import Mountset
@@ -74,12 +74,15 @@ class Test(unittest.TestCase):
node.tmpdir.value = tmpdir + '/tmp'
node.only_commit_events.value = False
node.data_root.value = tmpdir
+ node.sync_dirs.value = []
ad.index_write_queue.value = 10
local.local_root.value = tmpdir
local.activity_dirs.value = [tmpdir + '/Activities']
local.api_url.value = 'http://localhost:8800'
local.server_mode.value = False
local.mounts_root.value = None
+ mounts_monitor.stop()
+ mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1
Volume.RESOURCES = [
'sugar_network.resources.user',
diff --git a/tests/units/dbus_client.py b/tests/units/dbus_client.py
index 8d9813e..04b2307 100755
--- a/tests/units/dbus_client.py
+++ b/tests/units/dbus_client.py
@@ -33,7 +33,7 @@ class DbusClientTest(tests.Test):
return
self.fork(os.execvp, arg0, [arg0, self.id().split('.')[-1], 'fork'])
- coroutine.sleep(1)
+ coroutine.sleep(3)
def test_Call(self):
client = DBusClient(mountpoint='~')
diff --git a/tests/units/mounts_monitor.py b/tests/units/mounts_monitor.py
index 3ff5852..bc11f16 100755
--- a/tests/units/mounts_monitor.py
+++ b/tests/units/mounts_monitor.py
@@ -2,6 +2,7 @@
# sugar-lint: disable
import os
+import shutil
from __init__ import tests
@@ -13,12 +14,14 @@ class MountsMonitorTest(tests.Test):
def setUp(self):
tests.Test.setUp(self)
- mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 0
+ mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 0.01
def test_Populate(self):
- self.touch('mnt/foo')
- self.touch('mnt/bar')
- self.touch('mnt/fake')
+ self.touch('mnt/1/foo')
+ self.touch('mnt/2/foo')
+ self.touch('mnt/2/bar')
+ self.touch('mnt/3/fake')
+ os.makedirs('mnt/4')
mounts_monitor.start('mnt')
@@ -26,8 +29,8 @@ class MountsMonitorTest(tests.Test):
mounts_monitor.connect('foo', found.append, None)
mounts_monitor.connect('bar', found.append, None)
self.assertEqual(
- ['mnt/foo', 'mnt/bar'],
- found)
+ sorted(['mnt/1', 'mnt/2', 'mnt/2']),
+ sorted(found))
def test_Found(self):
os.makedirs('mnt')
@@ -38,14 +41,16 @@ class MountsMonitorTest(tests.Test):
mounts_monitor.connect('bar', found.append, None)
coroutine.dispatch()
- self.touch('mnt/foo')
- self.touch('mnt/bar')
- self.touch('mnt/fake')
+ self.touch('mnt/1/foo')
+ self.touch('mnt/2/foo')
+ self.touch('mnt/2/bar')
+ self.touch('mnt/3/fake')
+ os.makedirs('mnt/4')
coroutine.sleep(.5)
self.assertEqual(
- ['mnt/foo', 'mnt/bar'],
- found)
+ sorted(['mnt/1', 'mnt/2', 'mnt/2']),
+ sorted(found))
def test_Lost(self):
os.makedirs('mnt')
@@ -57,20 +62,24 @@ class MountsMonitorTest(tests.Test):
mounts_monitor.connect('bar', found.append, lost.append)
coroutine.dispatch()
- self.touch('mnt/foo')
- self.touch('mnt/bar')
- self.touch('mnt/fake')
- os.unlink('mnt/foo')
- os.unlink('mnt/bar')
- os.unlink('mnt/fake')
- coroutine.sleep(.5)
+ self.touch('mnt/1/foo')
+ self.touch('mnt/2/foo')
+ self.touch('mnt/2/bar')
+ self.touch('mnt/3/fake')
+ os.makedirs('mnt/4')
+ coroutine.sleep(.1)
+ shutil.rmtree('mnt/1')
+ shutil.rmtree('mnt/2')
+ shutil.rmtree('mnt/3')
+ shutil.rmtree('mnt/4')
+ coroutine.sleep(.1)
self.assertEqual(
- ['mnt/foo', 'mnt/bar'],
- found)
+ sorted(['mnt/1', 'mnt/2', 'mnt/2']),
+ sorted(found))
self.assertEqual(
- ['mnt/foo', 'mnt/bar'],
- lost)
+ sorted(['mnt/1', 'mnt/2', 'mnt/2']),
+ sorted(lost))
def test_FoundTimeout(self):
mounts_monitor._COMPLETE_MOUNT_TIMEOUT = 2
@@ -81,11 +90,11 @@ class MountsMonitorTest(tests.Test):
mounts_monitor.connect('probe', found.append, None)
coroutine.dispatch()
- self.touch('mnt/probe')
+ self.touch('mnt/1/probe')
coroutine.sleep(1)
self.assertEqual([], found)
coroutine.sleep(1.5)
- self.assertEqual(['mnt/probe'], found)
+ self.assertEqual(['mnt/1'], found)
if __name__ == '__main__':
diff --git a/tests/units/mountset.py b/tests/units/mountset.py
index ba00908..1073e7c 100755
--- a/tests/units/mountset.py
+++ b/tests/units/mountset.py
@@ -15,7 +15,7 @@ from sugar_network.local.mountset import Mountset
from sugar_network.local.bus import IPCServer
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
-from sugar_network.toolkit import http
+from sugar_network.toolkit import http, mounts_monitor
from sugar_network import local, Client, ServerError, sugar, node
from sugar_network.resources.volume import Volume
from sugar_network.local.mounts import HomeMount, RemoteMount
@@ -47,6 +47,7 @@ class MountsetTest(tests.Test):
mounts.open()
mounts.opened.wait()
+ mounts_monitor.start(tests.tmpdir)
# Let `open()` start processing spawned jobs
coroutine.dispatch()
diff --git a/tests/units/node_mount.py b/tests/units/node_mount.py
index 04c54ab..8aa0043 100755
--- a/tests/units/node_mount.py
+++ b/tests/units/node_mount.py
@@ -15,6 +15,7 @@ from sugar_network.local.mounts import HomeMount
from sugar_network.local.mountset import Mountset
from sugar_network.local.bus import IPCServer
from sugar_network.local import activities
+from sugar_network.toolkit import mounts_monitor
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network import local, Client, sugar
@@ -53,6 +54,7 @@ class NodeMountTest(tests.Test):
Client.connect(events_cb)
mounts.open()
+ mounts_monitor.start(tests.tmpdir)
mounts.opened.wait()
# Let `open()` start processing spawned jobs
coroutine.dispatch()
diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py
index bff7ce2..a1d9295 100755
--- a/tests/units/sync_master.py
+++ b/tests/units/sync_master.py
@@ -11,6 +11,7 @@ from __init__ import tests
import active_document as ad
from active_document.directory import Directory
+from sugar_network import node
from sugar_network.toolkit.sneakernet import InPacket, OutPacket, OutBufferPacket
from sugar_network.toolkit.files_sync import Seeder
from sugar_network.node import sync_master
@@ -572,8 +573,9 @@ class SyncMasterTest(tests.Test):
[i for i in packet])
def test_pull_ProcessFilePulls(self):
+ node.sync_dirs.value = ['files']
seqno = ad.Seqno('seqno')
- master = MasterCommands('master', sync_dirs=['files'])
+ master = MasterCommands('master')
request = Request()
response = ad.Response()
@@ -700,15 +702,15 @@ class Request(ad.Request):
self.environ = environ or {}
-class MasterCommands(sync_master.Commands):
+class MasterCommands(sync_master.SyncCommands):
def __init__(self, master, **kwargs):
os.makedirs('db')
with file('db/master', 'w') as f:
f.write(master)
- sync_master.Commands._guid = master
- sync_master.Commands.volume = new_volume('db')
- sync_master.Commands.__init__(self, **kwargs)
+ sync_master.SyncCommands._guid = master
+ sync_master.SyncCommands.volume = new_volume('db')
+ sync_master.SyncCommands.__init__(self, **kwargs)
def new_volume(root):
diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py
index fc9a049..d943066 100755
--- a/tests/units/sync_node.py
+++ b/tests/units/sync_node.py
@@ -9,7 +9,8 @@ from __init__ import tests
import active_document as ad
from sugar_network.toolkit.sneakernet import InPacket, OutFilePacket
-from sugar_network.local import mounts, api_url
+from sugar_network.local import api_url
+from sugar_network.node import sync_node
from sugar_network.toolkit import sneakernet
from sugar_network.resources.volume import Volume
from active_toolkit import coroutine
@@ -27,12 +28,12 @@ class SyncNodeTest(tests.Test):
return str(self.uuid)
def test_Export(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1', prop='value1')
node.volume['document'].create(guid='2', prop='value2')
- node.sync('sync')
+ node.sync('mnt')
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, None]]},
{'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': '1', 'diff': {
@@ -53,14 +54,14 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, 2]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
def test_Export_NoPullForExistingSession(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1')
- node.sync('sync', session='session')
+ node.sync('mnt', session='session')
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 'session', 'diff': {
'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
@@ -72,10 +73,10 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 'session', 'sequence': [[1, 1]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
def test_LimittedExport(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1', prop='*' * 1024)
node.volume['document'].create(guid='2', prop='*' * 1024)
@@ -84,16 +85,16 @@ class SyncNodeTest(tests.Test):
node.volume['document'].create(guid='5', prop='*' * 1024)
node.volume['document'].create(guid='6', prop='*' * 1024)
- kwargs = node.sync('sync', accept_length=1024, session=0)
+ kwargs = node.sync('mnt', accept_length=1024, session=0)
self.assertEqual(0, kwargs['session'])
self.assertEqual([[1, None]], kwargs['push_sequence'])
- self.assertEqual([], self.read_packets('sync'))
+ self.assertEqual([], self.read_packets('mnt'))
- kwargs = node.sync('sync', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0)
+ kwargs = node.sync('mnt', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0)
self.assertEqual([[1, None]], kwargs['push_sequence'])
- self.assertEqual([], self.read_packets('sync'))
+ self.assertEqual([], self.read_packets('mnt'))
- kwargs = node.sync('sync', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1)
+ kwargs = node.sync('mnt', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1)
self.assertEqual([[2, None]], kwargs['push_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': {
@@ -106,9 +107,9 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 1, 'sequence': [[1, 1]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
- kwargs = node.sync('sync', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2)
+ kwargs = node.sync('mnt', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2)
self.assertEqual([[4, None]], kwargs['push_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': {
@@ -129,9 +130,9 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 2, 'sequence': [[2, 3]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
- kwargs = node.sync('sync', push_sequence=kwargs['push_sequence'], session=3)
+ kwargs = node.sync('mnt', push_sequence=kwargs['push_sequence'], session=3)
self.assertEqual(None, kwargs)
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': {
@@ -160,12 +161,12 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-6.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 3, 'sequence': [[4, 6]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
def test_Import(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
- master_packet = OutFilePacket('sync', src='master')
+ master_packet = OutFilePacket('mnt', src='master')
master_packet.push(data=[
{'cmd': 'sn_ack', 'dst': 'node', 'sequence': [[1, 2]], 'merged': [[3, 4]]},
{'cmd': 'sn_ack', 'dst': 'other', 'sequence': [[5, 6]], 'merged': [[7, 8]]},
@@ -174,21 +175,21 @@ class SyncNodeTest(tests.Test):
])
master_packet.close()
- our_packet = OutFilePacket('sync', src='node', dst='master', session='stale')
+ our_packet = OutFilePacket('mnt', src='node', dst='master', session='stale')
our_packet.push(data=[
{'cmd': 'sn_push', 'document': 'document', 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}},
{'cmd': 'sn_commit', 'sequence': [[9, 10]]},
])
our_packet.close()
- other_node_packet = OutFilePacket('sync', src='other', dst='master')
+ other_node_packet = OutFilePacket('mnt', src='other', dst='master')
other_node_packet.push(data=[
{'cmd': 'sn_push', 'document': 'document', 'guid': '3', 'diff': {'guid': {'value': '3', 'mtime': 3}}},
{'cmd': 'sn_commit', 'sequence': [[13, 14]]},
])
other_node_packet.close()
- node.sync('sync', session='new')
+ node.sync('mnt', session='new')
assert exists(master_packet.path)
assert exists(other_node_packet.path)
@@ -196,28 +197,28 @@ class SyncNodeTest(tests.Test):
self.assertEqual(
[[3, None]],
- json.load(file('db/push.sequence')))
+ json.load(file('sync/push')))
self.assertEqual(
[[1, 2], [5, 10], [13, None]],
- json.load(file('db/pull.sequence')))
+ json.load(file('sync/pull')))
self.assertEqual(
['2', '3'],
[i.guid for i in node.volume['document'].find()[0]])
def test_TakeIntoAccountJustReadAckPacket(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1', prop='prop1')
node.volume['document'].create(guid='2', prop='prop2')
node.volume['document'].create(guid='3', prop='prop3')
- master_packet = OutFilePacket('sync', src='master')
+ master_packet = OutFilePacket('mnt', src='master')
master_packet.push(data=[
{'cmd': 'sn_ack', 'dst': 'node', 'sequence': [[1, 2]], 'merged': []},
])
master_packet.close()
- node.sync('sync', session='session')
+ node.sync('mnt', session='session')
self.assertEqual([
{'filename': 'master.packet', 'content_type': 'records', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 2]], 'merged': []},
@@ -231,74 +232,73 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-4.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': 'session', 'sequence': [[3, 3]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
def test_Import_DoNotDeletePacketsFromCurrentSession(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1')
- existing_session = OutFilePacket('sync', src='node', dst='master', session='the same')
+ existing_session = OutFilePacket('mnt', src='node', dst='master', session='the same')
existing_session.push(data=[
{'cmd': 'sn_push', 'document': 'document', 'range': [1, 1], 'guid': '1', 'diff': {'guid': {'value': '1', 'mtime': 1}}},
])
existing_session.close()
- self.assertEqual(1, len([i for i in sneakernet.walk('sync')]))
- node.sync('sync', session='the same')
+ self.assertEqual(1, len([i for i in sneakernet.walk('mnt')]))
+ node.sync('mnt', session='the same')
self.assertEqual(
['the same', 'the same'],
- [i.header['session'] for i in sneakernet.walk('sync')])
+ [i.header['session'] for i in sneakernet.walk('mnt')])
assert exists(existing_session.path)
- node.sync('sync', session='new one')
+ node.sync('mnt', session='new one')
self.assertEqual(
['new one'],
- [i.header['session'] for i in sneakernet.walk('sync')])
+ [i.header['session'] for i in sneakernet.walk('mnt')])
def test_sync_session(self):
- node = NodeMount('node', 'master')
+ node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1', prop='*' * 1024)
coroutine.dispatch()
- node.publisher = lambda x: events.append(x)
self.override(os, 'statvfs', lambda x: Statvfs(1024 - 512))
- events = []
- node.sync_session(['sync'])
+ node.events = []
+ node.sync_session('mnt')
self.assertEqual([
- {'event': 'sync_start', 'path': 'sync'},
+ {'event': 'sync_start', 'path': 'mnt'},
{'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"},
{'event': 'sync_continue'},
],
- events)
- records = self.read_packets('sync')
+ node.events)
+ records = self.read_packets('mnt')
self.assertEqual(1, len(records))
self.assertEqual('sn_pull', records[0]['cmd'])
session = records[0]['session']
- events = []
- node.sync_session(['sync'])
+ node.events = []
+ node.sync_session('mnt')
self.assertEqual([
- {'path': 'sync', 'event': 'sync_start'},
+ {'path': 'mnt', 'event': 'sync_start'},
{'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"},
{'event': 'sync_continue'},
],
- events)
+ node.events)
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
self.override(os, 'statvfs', lambda x: Statvfs(1024 + 512))
- events = []
- node.sync_session(['sync'])
+ node.events = []
+ node.sync_session('mnt')
self.assertEqual([
- {'path': 'sync', 'event': 'sync_start'},
+ {'path': 'mnt', 'event': 'sync_start'},
{'event': 'sync_progress', 'progress': "Generating 'node-1.packet' packet"},
{'event': 'sync_complete'},
],
- events)
+ node.events)
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]},
{'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': session, 'diff': {
@@ -311,7 +311,7 @@ class SyncNodeTest(tests.Test):
}},
{'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_commit', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, 1]]},
],
- self.read_packets('sync'))
+ self.read_packets('mnt'))
def read_packets(self, path):
result = []
@@ -330,15 +330,18 @@ class Statvfs(object):
self.f_bfree = f_bfree
-class NodeMount(mounts.NodeMount):
+class SyncCommands(sync_node.SyncCommands):
def __init__(self, node, master):
- os.makedirs('db')
- with file('db/node', 'w') as f:
- f.write(node)
- with file('db/master', 'w') as f:
- f.write(master)
- mounts.NodeMount.__init__(self, new_volume('db'), None)
+ sync_node.SyncCommands.__init__(self, 'sync')
+ self.node_guid = node
+ self.master_guid = master
+ self.volume = new_volume('db')
+ self.node_mount = self
+ self.events = []
+
+ def publish(self, event):
+ self.events.append(event)
def new_volume(root):