Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-03-04 18:44:41 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-03-04 18:44:41 (GMT)
commitff383bb37b8793231ac5df904d47942ad657cd37 (patch)
treec16cce19b604255a25b6b58a55b7da75e362f7b1
parent9028f9880747ca19aaf921d750451c455b20349c (diff)
Fix personal node sync; clean up old subscriptions related code for client
-rw-r--r--TODO1
-rwxr-xr-xsugar-network-client29
-rw-r--r--sugar_network/client/mounts.py9
-rw-r--r--sugar_network/client/mountset.py27
-rw-r--r--sugar_network/db/volume.py2
-rw-r--r--sugar_network/node/__init__.py4
-rw-r--r--sugar_network/node/commands.py9
-rw-r--r--sugar_network/node/slave.py73
-rw-r--r--sugar_network/resources/volume.py80
-rw-r--r--sugar_network/toolkit/http.py22
-rw-r--r--sugar_network/toolkit/router.py10
-rw-r--r--sugar_network/toolkit/util.py43
-rw-r--r--tests/__init__.py2
-rwxr-xr-xtests/integration/master_slave.py6
-rwxr-xr-xtests/units/client/home_mount.py6
-rwxr-xr-xtests/units/client/mountset.py7
-rwxr-xr-xtests/units/client/node_mount.py4
-rwxr-xr-xtests/units/client/remote_mount.py7
-rwxr-xr-xtests/units/node/sync_offline.py10
-rwxr-xr-xtests/units/resources/volume.py10
-rwxr-xr-xtests/units/toolkit/util.py58
21 files changed, 262 insertions, 157 deletions
diff --git a/TODO b/TODO
index 34156c4..3208f06 100644
--- a/TODO
+++ b/TODO
@@ -20,3 +20,4 @@
- "Cannot find implementation for" error if there is no required sugar
- trace says that current sugar version is (ok)
- increase granularity for sync.chunked_encode()
+- slave._Pooler might leak events if pullers are not in time to call wait()
diff --git a/sugar-network-client b/sugar-network-client
index 7bd12c6..ce9ef00 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -34,7 +34,7 @@ from sugar_network.client.mounts import RemoteMount
from sugar_network.client.mountset import Mountset
from sugar_network.zerosugar import clones
from sugar_network.node import stats_user, slave
-from sugar_network.node.slave import SlaveCommands
+from sugar_network.node.slave import PersonalCommands
from sugar_network.resources.volume import Volume
from sugar_network.toolkit.router import Router
from sugar_network.toolkit import Option
@@ -46,6 +46,8 @@ class Application(application.Daemon):
def __init__(self, **kwargs):
application.Daemon.__init__(self, **kwargs)
+ node.sync_layers.value = client.layers.value
+
self.jobs = coroutine.Pool()
util.init_logging(application.debug.value)
@@ -120,7 +122,7 @@ class Application(application.Daemon):
logging.info('Start %r server on %s port',
volume.root, node.port.value)
server = coroutine.WSGIServer(('0.0.0.0', node.port.value),
- Router(SlaveCommands(sugar.uid(), volume)))
+ Router(PersonalCommands(mountset)))
self.jobs.spawn(server.serve_forever)
else:
mountset['/'] = RemoteMount(volume)
@@ -134,10 +136,7 @@ class Application(application.Daemon):
self.accept()
- def delayed_start(event=None):
- logging.info('Proceed delayed start')
- mountset.disconnect(delayed_start)
-
+ def final_start():
self._sync(mountset.volume)
self.jobs.spawn(clones.monitor, mountset.volume['context'],
client.activity_dirs.value)
@@ -149,13 +148,21 @@ class Application(application.Daemon):
self.jobs.spawn(server.serve_forever)
if client.mounts_root.value:
- self.jobs.spawn(mountpoints.monitor,
- abspath(client.mounts_root.value))
+ mounts_root = abspath(client.mounts_root.value)
+ if not exists(mounts_root):
+ os.makedirs(mounts_root)
+ self.jobs.spawn(mountpoints.monitor, mounts_root)
+
+ def delayed_start(event=None):
+ for __ in mountset.subscribe(event='delayed-start'):
+ break
+ logging.info('Proceed delayed start')
+ final_start()
if client.delayed_start.value:
- mountset.connect(delayed_start, event='delayed-start')
+ self.jobs.spawn(delayed_start)
else:
- delayed_start()
+ final_start()
try:
mountset.open()
@@ -215,7 +222,7 @@ Option.seek('main', [toolkit.cachedir])
Option.seek('webui', webui)
Option.seek('client', client)
Option.seek('client', [sugar.keyfile])
-Option.seek('node', [node.port, node.files_root])
+Option.seek('node', [node.port, node.files_root, node.stats_root])
Option.seek('stats', stats_user)
Option.seek('db', db)
diff --git a/sugar_network/client/mounts.py b/sugar_network/client/mounts.py
index 364dae5..76222ce 100644
--- a/sugar_network/client/mounts.py
+++ b/sugar_network/client/mounts.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# pylint: disable-msg=E1102
+
import os
import logging
from os.path import isabs, exists, join, basename
@@ -35,7 +37,7 @@ class _Mount(object):
def __init__(self):
self.mountpoint = None
- self.publisher = None
+ self.broadcast = None
self.mounted = coroutine.Event()
def __call__(self, response=None, **kwargs):
@@ -66,11 +68,6 @@ class _Mount(object):
'private': self.private,
})
- def broadcast(self, event):
- if self.publisher is not None:
- # pylint: disable-msg=E1102
- self.publisher(event)
-
class LocalMount(VolumeCommands, _Mount):
diff --git a/sugar_network/client/mountset.py b/sugar_network/client/mountset.py
index e64c83e..b5b9d5d 100644
--- a/sugar_network/client/mountset.py
+++ b/sugar_network/client/mountset.py
@@ -20,7 +20,7 @@ from os.path import join
from sugar_network import db, client, node
from sugar_network.toolkit import netlink, mountpoints, router
-from sugar_network.toolkit import coroutine, util, exception, enforce
+from sugar_network.toolkit import coroutine, util, enforce
from sugar_network.client import journal, zeroconf
from sugar_network.client.mounts import LocalMount, NodeMount
from sugar_network.zerosugar import clones, injector
@@ -36,7 +36,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands):
def __init__(self, home_volume):
self.opened = coroutine.Event()
- self._subscriptions = {}
self._jobs = coroutine.Pool()
self.node_mount = None
@@ -54,7 +53,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands):
def __setitem__(self, mountpoint, mount):
dict.__setitem__(self, mountpoint, mount)
mount.mountpoint = mountpoint
- mount.publisher = self.broadcast
+ mount.broadcast = self.broadcast
mount.set_mounted(True)
def __delitem__(self, mountpoint):
@@ -112,21 +111,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands):
mount.set_mounted(True)
return mount.mounted.is_set()
- @db.volume_command(method='POST', cmd='broadcast')
- def broadcast(self, event, request=None):
- if request is not None:
- event = request.content
-
- for callback, condition in self._subscriptions.items():
- for key, value in condition.items():
- if event.get(key) != value:
- break
- else:
- try:
- callback(event)
- except Exception:
- exception(_logger, 'Failed to dispatch %r', event)
-
@db.document_command(method='GET', cmd='make')
def make(self, mountpoint, document, guid):
enforce(document == 'context', 'Only contexts can be launched')
@@ -237,13 +221,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands):
except db.CommandNotFound:
return self.super_call(request, response)
- def connect(self, callback, condition=None, **kwargs):
- self._subscriptions[callback] = condition or kwargs
-
- def disconnect(self, callback):
- if callback in self._subscriptions:
- del self._subscriptions[callback]
-
def open(self):
try:
mountpoints.connect(_DB_DIRNAME,
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index fea5f1f..23ffca4 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -86,7 +86,7 @@ class Volume(dict):
def notify(self, event):
for callback, condition in self._subscriptions.items():
for key, value in condition.items():
- if event.get(key) not in ('*', value):
+ if event.get(key) != value:
break
else:
try:
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index eea6dcc..b017653 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -53,7 +53,7 @@ static_url = Option(
stats_root = Option(
'path to the root directory for placing stats',
- default='/var/lib/sugar-network/stats')
+ default='/var/lib/sugar-network/stats', name='stats_root')
files_root = Option(
'path to a directory to keep files synchronized between nodes',
@@ -64,7 +64,7 @@ pull_timeout = Option(
'pull request will be ready',
default=30, type_cast=int)
-layers = Option(
+sync_layers = Option(
'space separated list of layers to restrict Sugar Network '
'synchronization content',
default=['pilot'], type_cast=Option.list_cast,
diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py
index 842b685..02a223b 100644
--- a/sugar_network/node/commands.py
+++ b/sugar_network/node/commands.py
@@ -43,6 +43,8 @@ class NodeCommands(VolumeCommands, Commands):
self._stats = stats_node.Sniffer(volume)
coroutine.spawn(self._commit_stats)
+ self.volume.connect(self.broadcast)
+
@property
def guid(self):
return self._guid
@@ -177,10 +179,6 @@ class NodeCommands(VolumeCommands, Commands):
guid=impls[0]['guid'], prop='data')
return self.call(request, db.Response())
- def broadcast(self, event):
- # TODO Node level broadcast events?
- _logger.info('Publish event: %r', event)
-
def call(self, request, response=None):
try:
result = VolumeCommands.call(self, request, response)
@@ -213,9 +211,6 @@ class NodeCommands(VolumeCommands, Commands):
return cmd
- def connect(self, callback, condition=None, **kwargs):
- self.volume.connect(callback, condition)
-
def before_create(self, request, props):
if request['document'] == 'user':
props['guid'], props['pubkey'] = _load_pubkey(props['pubkey'])
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 46a102a..4050d0b 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -25,9 +25,13 @@ from sugar_network import db, node
from sugar_network.client import Client, api_url
from sugar_network.node import sync, stats_user, files, volume
from sugar_network.node.commands import NodeCommands
-from sugar_network.toolkit import util, exception, enforce
+from sugar_network.toolkit import mountpoints
+from sugar_network.toolkit import sugar, coroutine, util, exception, enforce
+# Flag file to recognize a directory as a synchronization directory
+_SYNC_DIRNAME = '.sugar-network-sync'
+
_logger = logging.getLogger('node.slave')
@@ -51,7 +55,7 @@ class SlaveCommands(NodeCommands):
push = [('diff', None, volume.diff(self.volume, self._push_seq)),
('pull', {
'sequence': self._pull_seq,
- 'layer': node.layers.value,
+ 'layer': node.sync_layers.value,
}, None),
('files_pull', {'sequence': self._files_seq}, None),
]
@@ -67,7 +71,8 @@ class SlaveCommands(NodeCommands):
@db.volume_command(method='POST', cmd='offline-sync',
permissions=db.ACCESS_LOCAL)
def offline_sync(self, path):
- enforce(node.layers.value and 'public' not in node.layers.value,
+ enforce(node.sync_layers.value and
+ 'public' not in node.sync_layers.value,
'--layers is not specified, the full master dump might be '
'too big and should be limited')
enforce(isabs(path), 'Argument \'path\' should be an absolute path')
@@ -79,22 +84,18 @@ class SlaveCommands(NodeCommands):
os.makedirs(path)
try:
- self.broadcast({'event': 'sync_start', 'path': path})
self._offline_session = self._offline_sync(path,
**(self._offline_session or {}))
- except Exception, error:
+ except Exception:
exception(_logger, 'Failed to complete synchronization')
- self.broadcast({'event': 'sync_error', 'error': str(error)})
self._offline_session = None
raise
if self._offline_session is None:
_logger.debug('Synchronization completed')
- self.broadcast({'event': 'sync_complete'})
else:
_logger.debug('Postpone synchronization with %r session',
self._offline_session)
- self.broadcast({'event': 'sync_continue'})
def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None):
push = []
@@ -107,7 +108,7 @@ class SlaveCommands(NodeCommands):
session = db.uuid()
push.append(('pull', {
'sequence': self._pull_seq,
- 'layer': node.layers.value,
+ 'layer': node.sync_layers.value,
}, None))
push.append(('files_pull', {'sequence': self._files_seq}, None))
@@ -171,3 +172,57 @@ class SlaveCommands(NodeCommands):
if seq:
self._files_seq.exclude(seq)
self._files_seq.commit()
+
+
+class PersonalCommands(SlaveCommands):
+
+ def __init__(self, mountset):
+ SlaveCommands.__init__(self, sugar.uid(), mountset.volume)
+
+ self._mountset = mountset
+ self._mounts = util.Pool()
+ self._jobs = coroutine.Pool()
+
+ mountpoints.connect(_SYNC_DIRNAME,
+ self.__found_mountcb, self.__lost_mount_cb)
+
+ def broadcast(self, event):
+ self._mountset.broadcast(event)
+
+ def subscribe(self, *args, **kwargs):
+ return self._mountset.subscribe(*args, **kwargs)
+
+ def _sync_mounts(self):
+ self.broadcast({'event': 'sync_start'})
+
+ for mountpoint in self._mounts:
+ self.broadcast({'event': 'sync_next', 'path': mountpoint})
+ try:
+ self._offline_session = self._offline_sync(mountpoint,
+ **(self._offline_session or {}))
+ except Exception, error:
+ exception(_logger, 'Failed to complete synchronization')
+ self.broadcast({'event': 'sync_abort', 'error': str(error)})
+ self._offline_session = None
+ raise
+
+ if self._offline_session is None:
+ _logger.debug('Synchronization completed')
+ self.broadcast({'event': 'sync_complete'})
+ else:
+ _logger.debug('Postpone synchronization with %r session',
+ self._offline_session)
+ self.broadcast({'event': 'sync_paused'})
+
+ def __found_mountcb(self, path):
+ self._mounts.add(path)
+ if self._jobs:
+ _logger.debug('Found %r sync mount, pool it', path)
+ else:
+ _logger.debug('Found %r sync mount, start synchronization', path)
+ self._jobs.spawn(self._sync_mounts)
+
+ def __lost_mount_cb(self, path):
+ if self._mounts.remove(path) == util.Pool.ACTIVE:
+ _logger.warning('%r was unmounted, break synchronization', path)
+ self._jobs.kill()
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py
index a6657d8..f3729b6 100644
--- a/sugar_network/resources/volume.py
+++ b/sugar_network/resources/volume.py
@@ -162,11 +162,12 @@ class Volume(db.Volume):
class Commands(object):
def __init__(self):
- self._notifier = coroutine.AsyncResult()
- self.connect(lambda event: self._notify(event))
+ self._pooler = _Pooler()
- def connect(self, callback, condition=None, **kwargs):
- raise NotImplementedError()
+ def broadcast(self, event):
+ _logger.debug('Publish event: %r', event)
+ self._pooler.notify_all(event)
+ coroutine.dispatch()
@router.route('GET', '/robots.txt')
def robots(self, request, response):
@@ -185,46 +186,34 @@ class Commands(object):
@db.volume_command(method='GET', cmd='subscribe',
mime_type='application/json')
- def subscribe(self, request=None, response=None, only_commits=False):
- """Subscribe to Server-Sent Events.
-
- :param only_commits:
- subscribers can be notified only with "commit" events;
- that is useful to minimize interactions between server and clients
-
- """
+ def subscribe(self, request=None, response=None, **condition):
+ """Subscribe to Server-Sent Events."""
+ if request is not None and not condition:
+ condition = request.query
if response is not None:
response.content_type = 'text/event-stream'
response['Cache-Control'] = 'no-cache'
peer = 'anonymous'
if hasattr(request, 'environ'):
peer = request.environ.get('HTTP_SUGAR_USER') or peer
- return self._pull_events(peer, only_commits)
+ return self._pull_events(peer, condition)
- def _pull_events(self, peer, only_commits):
+ def _pull_events(self, peer, condition):
_logger.debug('Start pulling events to %s user', peer)
-
- yield 'data: %s\n\n' % json.dumps({'event': 'handshake'})
try:
while True:
- event = self._notifier.get()
- if only_commits:
- if event['event'] != 'commit':
- continue
+ event = self._pooler.wait()
+ for key, value in condition.items():
+ if value.startswith('!'):
+ if event.get(key) == value[1:]:
+ break
+ elif event.get(key) != value:
+ break
else:
- if event['event'] == 'commit':
- # Subscribers already got update notifications enough
- continue
- yield 'data: %s\n\n' % json.dumps(event)
+ yield 'data: %s\n\n' % json.dumps(event)
finally:
_logger.debug('Stop pulling events to %s user', peer)
- def _notify(self, event):
- _logger.debug('Publish event: %r', event)
- self._notifier.set(event)
- self._notifier = coroutine.AsyncResult()
- coroutine.dispatch()
-
class VolumeCommands(db.VolumeCommands):
@@ -315,6 +304,37 @@ class VolumeCommands(db.VolumeCommands):
props[name] = url
+class _Pooler(object):
+ """One-producer-to-many-consumers events delivery."""
+
+ def __init__(self):
+ self._value = None
+ self._waiters = 0
+ self._ready = coroutine.Event()
+ self._open = coroutine.Event()
+ self._open.set()
+
+ def wait(self):
+ self._open.wait()
+ self._waiters += 1
+ try:
+ self._ready.wait()
+ finally:
+ self._waiters -= 1
+ if self._waiters == 0:
+ self._ready.clear()
+ self._open.set()
+ return self._value
+
+ def notify_all(self, value=None):
+ self._open.wait()
+ if not self._waiters:
+ return
+ self._open.clear()
+ self._value = value
+ self._ready.set()
+
+
_HELLO_HTML = """\
<h2>Welcome to Sugar Network API!</h2>
Consult <a href="http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network/API">
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index ee6ba7f..caabc46 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -193,8 +193,8 @@ class Client(object):
result = reply.raw
return result
- def subscribe(self):
- return _Subscription(self, _RECONNECTION_NUMBER)
+ def subscribe(self, **condition):
+ return _Subscription(self, condition, _RECONNECTION_NUMBER)
def _register(self):
self.post(['user'], {
@@ -214,10 +214,12 @@ class Client(object):
class _Subscription(object):
- def __init__(self, aclient, tries):
+ def __init__(self, aclient, condition, tries):
self._tries = tries or 1
self._client = aclient
- self._response = None
+ self._content = None
+ self._params = condition
+ self._params['cmd'] = 'subscribe'
def __iter__(self):
while True:
@@ -241,7 +243,7 @@ class _Subscription(object):
raise
exception('Failed to read from %r subscription, '
'will resubscribe', self._client.api_url)
- self._response = None
+ self._content = None
if line.startswith('data: '):
try:
@@ -251,15 +253,14 @@ class _Subscription(object):
line, self._client.api_url)
def _handshake(self):
- if self._response is not None:
- return self._response.raw
+ if self._content is not None:
+ return self._content
_logger.debug('Subscribe to %r', self._client.api_url)
for a_try in reversed(xrange(self._tries)):
try:
- self._response = self._client.request('GET',
- params={'cmd': 'subscribe'})
+ response = self._client.request('GET', params=self._params)
break
except Exception:
if a_try == 0:
@@ -269,7 +270,8 @@ class _Subscription(object):
self._client.api_url, _RECONNECTION_TIMEOUT)
coroutine.sleep(_RECONNECTION_TIMEOUT)
- return self._response.raw
+ self._content = response.raw
+ return self._content
def _sign(privkey_path, data):
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index afb9c60..657621e 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -336,18 +336,22 @@ class _Request(Request):
self.accept_language = _parse_accept_language(
environ.get('HTTP_ACCEPT_LANGUAGE'))
self.principal = None
+ self.query = {}
enforce('..' not in self.path, 'Relative url path')
query = environ.get('QUERY_STRING') or ''
for attr, value in parse_qsl(query):
+ attr = str(attr)
param = self.get(attr)
if type(param) is list:
param.append(value)
- elif param is not None:
- self[str(attr)] = [param, value]
else:
- self[str(attr)] = value
+ if param is not None:
+ value = [param, value]
+ self[attr] = value
+ if attr != 'cmd':
+ self.query[attr] = value
if query:
self.url += '?' + query
diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py
index 728a4ed..8171950 100644
--- a/sugar_network/toolkit/util.py
+++ b/sugar_network/toolkit/util.py
@@ -533,50 +533,53 @@ class PersistentSequence(Sequence):
os.fsync(f.fileno())
-class MutableStack(object):
+class Pool(object):
"""Stack that keeps its iterators correct after changing content."""
+ QUEUED = 0
+ ACTIVE = 1
+ PASSED = 2
+
def __init__(self):
self._queue = collections.deque()
def add(self, value):
self.remove(value)
- self._queue.appendleft([False, value])
+ self._queue.appendleft([Pool.QUEUED, value])
def remove(self, value):
- for i, (__, existing) in enumerate(self._queue):
+ for i, (state, existing) in enumerate(self._queue):
if existing == value:
del self._queue[i]
- break
+ return state
+
+ def get_state(self, value):
+ for state, existing in self._queue:
+ if existing == value:
+ return state
def rewind(self):
for i in self._queue:
- i[0] = False
+ i[0] = Pool.QUEUED
def __len__(self):
return len(self._queue)
def __iter__(self):
- return _MutableStackIterator(self._queue)
+ for i in self._queue:
+ state, value = i
+ if state == Pool.PASSED:
+ continue
+ try:
+ i[0] = Pool.ACTIVE
+ yield value
+ finally:
+ i[0] = Pool.PASSED
def __repr__(self):
return str([i[1] for i in self._queue])
-class _MutableStackIterator(object):
-
- def __init__(self, queue):
- self._queue = queue
-
- def next(self):
- for i in self._queue:
- processed, value = i
- if not processed:
- i[0] = True
- return value
- raise StopIteration()
-
-
class _NullHandler(logging.Handler):
def emit(self, record):
diff --git a/tests/__init__.py b/tests/__init__.py
index c62661a..74811a5 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -83,7 +83,7 @@ class Test(unittest.TestCase):
node.data_root.value = tmpdir
node.static_url.value = None
node.files_root.value = None
- node.layers.value = None
+ node.sync_layers.value = None
db.index_write_queue.value = 10
client.local_root.value = tmpdir
client.activity_dirs.value = [tmpdir + '/Activities']
diff --git a/tests/integration/master_slave.py b/tests/integration/master_slave.py
index 9d35ed8..0b1fb87 100755
--- a/tests/integration/master_slave.py
+++ b/tests/integration/master_slave.py
@@ -45,7 +45,7 @@ class MasterSlaveTest(tests.Test):
'-DDD', '--rundir=slave/run', '--files-root=slave/files',
'--stats-root=slave/stats', '--stats-user', '--stats-user-step=1',
'--stats-user-rras=RRA:AVERAGE:0.5:1:100',
- '--index-flush-threshold=1', '--layers=pilot',
+ '--index-flush-threshold=1', '--sync-layers=pilot',
])
coroutine.sleep(2)
@@ -71,6 +71,7 @@ class MasterSlaveTest(tests.Test):
'summary': 'summary',
'description': 'description',
'preview': 'preview1',
+ 'layer': 'pilot',
})
self.touch(('master/files/file1', 'file1'))
@@ -80,6 +81,7 @@ class MasterSlaveTest(tests.Test):
'summary': 'summary',
'description': 'description',
'preview': 'preview2',
+ 'layer': 'pilot',
})
slave.post(['user', tests.UID], {
'name': 'db',
@@ -119,6 +121,7 @@ class MasterSlaveTest(tests.Test):
'summary': 'summary',
'description': 'description',
'preview': 'preview3',
+ 'layer': 'pilot',
})
master.put(['context', context1, 'title'], 'title1_')
master.put(['context', context2, 'preview'], 'preview2_')
@@ -131,6 +134,7 @@ class MasterSlaveTest(tests.Test):
'summary': 'summary',
'description': 'description',
'preview': 'preview4',
+ 'layer': 'pilot',
})
slave.put(['context', context2, 'title'], 'title2_')
slave.put(['context', context1, 'preview'], 'preview1_')
diff --git a/tests/units/client/home_mount.py b/tests/units/client/home_mount.py
index 1014756..6735311 100755
--- a/tests/units/client/home_mount.py
+++ b/tests/units/client/home_mount.py
@@ -164,7 +164,7 @@ class HomeMountTest(tests.Test):
events = []
def read_events():
- for event in local.subscribe():
+ for event in local.subscribe(event='!commit'):
if 'props' in event:
event.pop('props')
events.append(event)
@@ -186,7 +186,6 @@ class HomeMountTest(tests.Test):
job.kill()
self.assertEqual([
- {'event': 'handshake'},
{'guid': guid, 'document': 'context', 'event': 'create', 'mountpoint': '~'},
{'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~'},
{'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '~'},
@@ -206,7 +205,7 @@ class HomeMountTest(tests.Test):
})
def read_events():
- for event in local.subscribe():
+ for event in IPCClient().subscribe(event='!commit'):
events.append(event)
job = coroutine.spawn(read_events)
coroutine.sleep(.1)
@@ -218,7 +217,6 @@ class HomeMountTest(tests.Test):
job.kill()
self.assertEqual([
- {'event': 'handshake'},
{'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~', 'props': {'title': 'title_2'}},
{'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '/', 'props': {'favorite': True}},
{'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~', 'props': {'favorite': True}},
diff --git a/tests/units/client/mountset.py b/tests/units/client/mountset.py
index 0fe34f4..5e4eda3 100755
--- a/tests/units/client/mountset.py
+++ b/tests/units/client/mountset.py
@@ -55,8 +55,7 @@ class MountsetTest(tests.Test):
if 'props' in event:
event.pop('props')
self.events.append(event)
- if event['event'] != 'handshake':
- self.mounted.set()
+ self.mounted.set()
coroutine.dispatch()
self.events_job = coroutine.spawn(read_events)
@@ -77,7 +76,6 @@ class MountsetTest(tests.Test):
self.assertEqual(
sorted([
- {'event': 'handshake'},
{'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
{'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'},
]),
@@ -100,7 +98,6 @@ class MountsetTest(tests.Test):
coroutine.sleep(.5)
self.assertEqual([
- {'event': 'handshake'},
{'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
],
self.events)
@@ -116,7 +113,6 @@ class MountsetTest(tests.Test):
self.mounted.clear()
self.assertEqual([
- {'event': 'handshake'},
{'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'},
{'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'},
],
@@ -169,7 +165,6 @@ class MountsetTest(tests.Test):
self.mounted.clear()
self.assertEqual([
- {'event': 'handshake'},
{'mountpoint': tests.tmpdir + '/mnt', 'event': 'mount', 'private': False, 'name': 'mnt'},
],
self.events)
diff --git a/tests/units/client/node_mount.py b/tests/units/client/node_mount.py
index e7bdcda..349c6e3 100755
--- a/tests/units/client/node_mount.py
+++ b/tests/units/client/node_mount.py
@@ -65,8 +65,7 @@ class NodeMountTest(tests.Test):
if 'props' in event:
event.pop('props')
events.append(event)
- if event['event'] != 'handshake':
- got_event.set()
+ got_event.set()
job = coroutine.spawn(read_events)
guid = remote.post(['context'], {
@@ -78,7 +77,6 @@ class NodeMountTest(tests.Test):
got_event.wait()
got_event.clear()
self.assertEqual([
- {'event': 'handshake'},
{'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1},
],
events)
diff --git a/tests/units/client/remote_mount.py b/tests/units/client/remote_mount.py
index bd754e1..9813f94 100755
--- a/tests/units/client/remote_mount.py
+++ b/tests/units/client/remote_mount.py
@@ -35,7 +35,7 @@ class RemoteMountTest(tests.Test):
events = []
def read_events():
- for event in remote.subscribe():
+ for event in remote.subscribe(event='!commit'):
if 'props' in event:
event.pop('props')
events.append(event)
@@ -57,8 +57,6 @@ class RemoteMountTest(tests.Test):
job.kill()
self.assertEqual([
- {'event': 'handshake'},
- {'event': 'handshake', 'mountpoint': '/'},
{'guid': guid, 'document': 'context', 'event': 'create', 'mountpoint': '/'},
{'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '/'},
{'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '/'},
@@ -92,9 +90,7 @@ class RemoteMountTest(tests.Test):
self.assertEqual(True, remote.get(cmd='mounted'))
self.assertEqual([
- {'event': 'handshake'},
{'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
- {'event': 'handshake', 'mountpoint': '/'},
],
events)
del events[:]
@@ -117,7 +113,6 @@ class RemoteMountTest(tests.Test):
self.assertEqual(True, remote.get(cmd='mounted'))
self.assertEqual([
{'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False},
- {'event': 'handshake', 'mountpoint': '/'},
],
events)
del events[:]
diff --git a/tests/units/node/sync_offline.py b/tests/units/node/sync_offline.py
index cf8091d..f6b8ce3 100755
--- a/tests/units/node/sync_offline.py
+++ b/tests/units/node/sync_offline.py
@@ -36,7 +36,7 @@ class SyncOfflineTest(tests.Test):
statvfs.f_bfree = 999999999
stats_user.stats_user_step.value = 1
stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100']
- node.layers.value = 'pilot'
+ node.sync_layers.value = 'pilot'
def next_uuid(self):
self.uuid += 1
@@ -50,13 +50,13 @@ class SyncOfflineTest(tests.Test):
volume = Volume('node', [Document])
cp = SlaveCommands('node', volume)
- node.layers.value = None
+ node.sync_layers.value = None
self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt')
- node.layers.value = 'public'
+ node.sync_layers.value = 'public'
self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt')
- node.layers.value = ['public']
+ node.sync_layers.value = ['public']
self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt')
- node.layers.value = ['public', 'foo']
+ node.sync_layers.value = ['public', 'foo']
self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt')
def test_Export(self):
diff --git a/tests/units/resources/volume.py b/tests/units/resources/volume.py
index 0ea3fb4..afc725f 100755
--- a/tests/units/resources/volume.py
+++ b/tests/units/resources/volume.py
@@ -48,7 +48,7 @@ class VolumeTest(tests.Test):
events = []
def read_events():
- for event in cp.subscribe(Request(), db.Response()):
+ for event in cp.subscribe(event='!commit'):
if not event.strip():
continue
assert event.startswith('data: ')
@@ -71,14 +71,13 @@ class VolumeTest(tests.Test):
job.kill()
self.assertEqual([
- {'event': 'handshake'},
{'guid': 'guid', 'document': 'document', 'event': 'create'},
{'guid': 'guid', 'document': 'document', 'event': 'update'},
{'guid': 'guid', 'event': 'delete', 'document': u'document'},
],
events)
- def test_SubscribeToOnlyCommits(self):
+ def __test_SubscribeCondition(self):
class Document(Resource):
@@ -114,7 +113,6 @@ class VolumeTest(tests.Test):
job.kill()
self.assertEqual([
- {'event': 'handshake'},
{'document': 'document', 'event': 'commit'},
{'document': 'document', 'event': 'commit'},
{'document': 'document', 'event': 'commit'},
@@ -498,9 +496,7 @@ class TestCommands(VolumeCommands, Commands):
def __init__(self, volume):
VolumeCommands.__init__(self, volume)
Commands.__init__(self)
-
- def connect(self, callback, condition=None, **kwargs):
- self.volume.connect(callback, condition)
+ self.volume.connect(self.broadcast)
def call(cp, principal=None, content=None, **kwargs):
diff --git a/tests/units/toolkit/util.py b/tests/units/toolkit/util.py
index 44bb9e4..ada713d 100755
--- a/tests/units/toolkit/util.py
+++ b/tests/units/toolkit/util.py
@@ -363,6 +363,64 @@ class UtilTest(tests.Test):
self.assertEqual(['\n', 'b'], readlines('\nb'))
self.assertEqual([' \n', ' b \n'], readlines(' \n b \n'))
+ def test_Pool(self):
+ stack = util.Pool()
+
+ stack.add('a')
+ stack.add('b')
+ stack.add('c')
+
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('a'))
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('b'))
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('c'))
+ self.assertEqual(
+ [('c', util.Pool.ACTIVE), ('b', util.Pool.ACTIVE), ('a', util.Pool.ACTIVE)],
+ [(i, stack.get_state(i)) for i in stack])
+ self.assertEqual(
+ [],
+ [i for i in stack])
+ self.assertEqual(util.Pool.PASSED, stack.get_state('a'))
+ self.assertEqual(util.Pool.PASSED, stack.get_state('b'))
+ self.assertEqual(util.Pool.PASSED, stack.get_state('c'))
+
+ stack.rewind()
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('a'))
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('b'))
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('c'))
+ self.assertEqual(
+ ['c', 'b', 'a'],
+ [i for i in stack])
+
+ stack.add('c')
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('c'))
+ self.assertEqual(
+ [('c', util.Pool.ACTIVE)],
+ [(i, stack.get_state(i)) for i in stack])
+ self.assertEqual(util.Pool.PASSED, stack.get_state('c'))
+
+ stack.add('b')
+ stack.add('a')
+ self.assertEqual(
+ ['a', 'b'],
+ [i for i in stack])
+
+ stack.rewind()
+ self.assertEqual(
+ ['a', 'b', 'c'],
+ [i for i in stack])
+
+ stack.add('d')
+ self.assertEqual(util.Pool.QUEUED, stack.get_state('d'))
+ self.assertEqual(
+ [('d', util.Pool.ACTIVE)],
+ [(i, stack.get_state(i)) for i in stack])
+ self.assertEqual(util.Pool.PASSED, stack.get_state('d'))
+
+ stack.rewind()
+ self.assertEqual(
+ ['d', 'a', 'b', 'c'],
+ [i for i in stack])
+
if __name__ == '__main__':
tests.main()