diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2013-03-04 18:44:41 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2013-03-04 18:44:41 (GMT) |
commit | ff383bb37b8793231ac5df904d47942ad657cd37 (patch) | |
tree | c16cce19b604255a25b6b58a55b7da75e362f7b1 | |
parent | 9028f9880747ca19aaf921d750451c455b20349c (diff) |
Fix personal node sync; clean up old subscriptions related code for client
-rw-r--r-- | TODO | 1 | ||||
-rwxr-xr-x | sugar-network-client | 29 | ||||
-rw-r--r-- | sugar_network/client/mounts.py | 9 | ||||
-rw-r--r-- | sugar_network/client/mountset.py | 27 | ||||
-rw-r--r-- | sugar_network/db/volume.py | 2 | ||||
-rw-r--r-- | sugar_network/node/__init__.py | 4 | ||||
-rw-r--r-- | sugar_network/node/commands.py | 9 | ||||
-rw-r--r-- | sugar_network/node/slave.py | 73 | ||||
-rw-r--r-- | sugar_network/resources/volume.py | 80 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 22 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 10 | ||||
-rw-r--r-- | sugar_network/toolkit/util.py | 43 | ||||
-rw-r--r-- | tests/__init__.py | 2 | ||||
-rwxr-xr-x | tests/integration/master_slave.py | 6 | ||||
-rwxr-xr-x | tests/units/client/home_mount.py | 6 | ||||
-rwxr-xr-x | tests/units/client/mountset.py | 7 | ||||
-rwxr-xr-x | tests/units/client/node_mount.py | 4 | ||||
-rwxr-xr-x | tests/units/client/remote_mount.py | 7 | ||||
-rwxr-xr-x | tests/units/node/sync_offline.py | 10 | ||||
-rwxr-xr-x | tests/units/resources/volume.py | 10 | ||||
-rwxr-xr-x | tests/units/toolkit/util.py | 58 |
21 files changed, 262 insertions, 157 deletions
@@ -20,3 +20,4 @@ - "Cannot find implementation for" error if there is no required sugar - trace says that current sugar version is (ok) - increase granularity for sync.chunked_encode() +- slave._Pooler might leak events if pullers are not in time to call wait() diff --git a/sugar-network-client b/sugar-network-client index 7bd12c6..ce9ef00 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -34,7 +34,7 @@ from sugar_network.client.mounts import RemoteMount from sugar_network.client.mountset import Mountset from sugar_network.zerosugar import clones from sugar_network.node import stats_user, slave -from sugar_network.node.slave import SlaveCommands +from sugar_network.node.slave import PersonalCommands from sugar_network.resources.volume import Volume from sugar_network.toolkit.router import Router from sugar_network.toolkit import Option @@ -46,6 +46,8 @@ class Application(application.Daemon): def __init__(self, **kwargs): application.Daemon.__init__(self, **kwargs) + node.sync_layers.value = client.layers.value + self.jobs = coroutine.Pool() util.init_logging(application.debug.value) @@ -120,7 +122,7 @@ class Application(application.Daemon): logging.info('Start %r server on %s port', volume.root, node.port.value) server = coroutine.WSGIServer(('0.0.0.0', node.port.value), - Router(SlaveCommands(sugar.uid(), volume))) + Router(PersonalCommands(mountset))) self.jobs.spawn(server.serve_forever) else: mountset['/'] = RemoteMount(volume) @@ -134,10 +136,7 @@ class Application(application.Daemon): self.accept() - def delayed_start(event=None): - logging.info('Proceed delayed start') - mountset.disconnect(delayed_start) - + def final_start(): self._sync(mountset.volume) self.jobs.spawn(clones.monitor, mountset.volume['context'], client.activity_dirs.value) @@ -149,13 +148,21 @@ class Application(application.Daemon): self.jobs.spawn(server.serve_forever) if client.mounts_root.value: - self.jobs.spawn(mountpoints.monitor, - abspath(client.mounts_root.value)) + mounts_root = abspath(client.mounts_root.value) + if not exists(mounts_root): + os.makedirs(mounts_root) + self.jobs.spawn(mountpoints.monitor, mounts_root) + + def delayed_start(event=None): + for __ in mountset.subscribe(event='delayed-start'): + break + logging.info('Proceed delayed start') + final_start() if client.delayed_start.value: - mountset.connect(delayed_start, event='delayed-start') + self.jobs.spawn(delayed_start) else: - delayed_start() + final_start() try: mountset.open() @@ -215,7 +222,7 @@ Option.seek('main', [toolkit.cachedir]) Option.seek('webui', webui) Option.seek('client', client) Option.seek('client', [sugar.keyfile]) -Option.seek('node', [node.port, node.files_root]) +Option.seek('node', [node.port, node.files_root, node.stats_root]) Option.seek('stats', stats_user) Option.seek('db', db) diff --git a/sugar_network/client/mounts.py b/sugar_network/client/mounts.py index 364dae5..76222ce 100644 --- a/sugar_network/client/mounts.py +++ b/sugar_network/client/mounts.py @@ -13,6 +13,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +# pylint: disable-msg=E1102 + import os import logging from os.path import isabs, exists, join, basename @@ -35,7 +37,7 @@ class _Mount(object): def __init__(self): self.mountpoint = None - self.publisher = None + self.broadcast = None self.mounted = coroutine.Event() def __call__(self, response=None, **kwargs): @@ -66,11 +68,6 @@ class _Mount(object): 'private': self.private, }) - def broadcast(self, event): - if self.publisher is not None: - # pylint: disable-msg=E1102 - self.publisher(event) - class LocalMount(VolumeCommands, _Mount): diff --git a/sugar_network/client/mountset.py b/sugar_network/client/mountset.py index e64c83e..b5b9d5d 100644 --- a/sugar_network/client/mountset.py +++ b/sugar_network/client/mountset.py @@ -20,7 +20,7 @@ from os.path import join from sugar_network import db, client, node from sugar_network.toolkit import netlink, mountpoints, router -from sugar_network.toolkit import coroutine, util, exception, enforce +from sugar_network.toolkit import coroutine, util, enforce from sugar_network.client import journal, zeroconf from sugar_network.client.mounts import LocalMount, NodeMount from sugar_network.zerosugar import clones, injector @@ -36,7 +36,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands): def __init__(self, home_volume): self.opened = coroutine.Event() - self._subscriptions = {} self._jobs = coroutine.Pool() self.node_mount = None @@ -54,7 +53,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands): def __setitem__(self, mountpoint, mount): dict.__setitem__(self, mountpoint, mount) mount.mountpoint = mountpoint - mount.publisher = self.broadcast + mount.broadcast = self.broadcast mount.set_mounted(True) def __delitem__(self, mountpoint): @@ -112,21 +111,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands): mount.set_mounted(True) return mount.mounted.is_set() - @db.volume_command(method='POST', cmd='broadcast') - def broadcast(self, event, request=None): - if request is not None: - event = request.content - - for callback, condition in self._subscriptions.items(): - for key, value in condition.items(): - if event.get(key) != value: - break - else: - try: - callback(event) - except Exception: - exception(_logger, 'Failed to dispatch %r', event) - @db.document_command(method='GET', cmd='make') def make(self, mountpoint, document, guid): enforce(document == 'context', 'Only contexts can be launched') @@ -237,13 +221,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands): except db.CommandNotFound: return self.super_call(request, response) - def connect(self, callback, condition=None, **kwargs): - self._subscriptions[callback] = condition or kwargs - - def disconnect(self, callback): - if callback in self._subscriptions: - del self._subscriptions[callback] - def open(self): try: mountpoints.connect(_DB_DIRNAME, diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index fea5f1f..23ffca4 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -86,7 +86,7 @@ class Volume(dict): def notify(self, event): for callback, condition in self._subscriptions.items(): for key, value in condition.items(): - if event.get(key) not in ('*', value): + if event.get(key) != value: break else: try: diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index eea6dcc..b017653 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -53,7 +53,7 @@ static_url = Option( stats_root = Option( 'path to the root directory for placing stats', - default='/var/lib/sugar-network/stats') + default='/var/lib/sugar-network/stats', name='stats_root') files_root = Option( 'path to a directory to keep files synchronized between nodes', @@ -64,7 +64,7 @@ pull_timeout = Option( 'pull request will be ready', default=30, type_cast=int) -layers = Option( +sync_layers = Option( 'space separated list of layers to restrict Sugar Network ' 'synchronization content', default=['pilot'], type_cast=Option.list_cast, diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index 842b685..02a223b 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -43,6 +43,8 @@ class NodeCommands(VolumeCommands, Commands): self._stats = stats_node.Sniffer(volume) coroutine.spawn(self._commit_stats) + self.volume.connect(self.broadcast) + @property def guid(self): return self._guid @@ -177,10 +179,6 @@ class NodeCommands(VolumeCommands, Commands): guid=impls[0]['guid'], prop='data') return self.call(request, db.Response()) - def broadcast(self, event): - # TODO Node level broadcast events? - _logger.info('Publish event: %r', event) - def call(self, request, response=None): try: result = VolumeCommands.call(self, request, response) @@ -213,9 +211,6 @@ class NodeCommands(VolumeCommands, Commands): return cmd - def connect(self, callback, condition=None, **kwargs): - self.volume.connect(callback, condition) - def before_create(self, request, props): if request['document'] == 'user': props['guid'], props['pubkey'] = _load_pubkey(props['pubkey']) diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 46a102a..4050d0b 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -25,9 +25,13 @@ from sugar_network import db, node from sugar_network.client import Client, api_url from sugar_network.node import sync, stats_user, files, volume from sugar_network.node.commands import NodeCommands -from sugar_network.toolkit import util, exception, enforce +from sugar_network.toolkit import mountpoints +from sugar_network.toolkit import sugar, coroutine, util, exception, enforce +# Flag file to recognize a directory as a synchronization directory +_SYNC_DIRNAME = '.sugar-network-sync' + _logger = logging.getLogger('node.slave') @@ -51,7 +55,7 @@ class SlaveCommands(NodeCommands): push = [('diff', None, volume.diff(self.volume, self._push_seq)), ('pull', { 'sequence': self._pull_seq, - 'layer': node.layers.value, + 'layer': node.sync_layers.value, }, None), ('files_pull', {'sequence': self._files_seq}, None), ] @@ -67,7 +71,8 @@ class SlaveCommands(NodeCommands): @db.volume_command(method='POST', cmd='offline-sync', permissions=db.ACCESS_LOCAL) def offline_sync(self, path): - enforce(node.layers.value and 'public' not in node.layers.value, + enforce(node.sync_layers.value and + 'public' not in node.sync_layers.value, '--layers is not specified, the full master dump might be ' 'too big and should be limited') enforce(isabs(path), 'Argument \'path\' should be an absolute path') @@ -79,22 +84,18 @@ class SlaveCommands(NodeCommands): os.makedirs(path) try: - self.broadcast({'event': 'sync_start', 'path': path}) self._offline_session = self._offline_sync(path, **(self._offline_session or {})) - except Exception, error: + except Exception: exception(_logger, 'Failed to complete synchronization') - self.broadcast({'event': 'sync_error', 'error': str(error)}) self._offline_session = None raise if self._offline_session is None: _logger.debug('Synchronization completed') - self.broadcast({'event': 'sync_complete'}) else: _logger.debug('Postpone synchronization with %r session', self._offline_session) - self.broadcast({'event': 'sync_continue'}) def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None): push = [] @@ -107,7 +108,7 @@ class SlaveCommands(NodeCommands): session = db.uuid() push.append(('pull', { 'sequence': self._pull_seq, - 'layer': node.layers.value, + 'layer': node.sync_layers.value, }, None)) push.append(('files_pull', {'sequence': self._files_seq}, None)) @@ -171,3 +172,57 @@ class SlaveCommands(NodeCommands): if seq: self._files_seq.exclude(seq) self._files_seq.commit() + + +class PersonalCommands(SlaveCommands): + + def __init__(self, mountset): + SlaveCommands.__init__(self, sugar.uid(), mountset.volume) + + self._mountset = mountset + self._mounts = util.Pool() + self._jobs = coroutine.Pool() + + mountpoints.connect(_SYNC_DIRNAME, + self.__found_mountcb, self.__lost_mount_cb) + + def broadcast(self, event): + self._mountset.broadcast(event) + + def subscribe(self, *args, **kwargs): + return self._mountset.subscribe(*args, **kwargs) + + def _sync_mounts(self): + self.broadcast({'event': 'sync_start'}) + + for mountpoint in self._mounts: + self.broadcast({'event': 'sync_next', 'path': mountpoint}) + try: + self._offline_session = self._offline_sync(mountpoint, + **(self._offline_session or {})) + except Exception, error: + exception(_logger, 'Failed to complete synchronization') + self.broadcast({'event': 'sync_abort', 'error': str(error)}) + self._offline_session = None + raise + + if self._offline_session is None: + _logger.debug('Synchronization completed') + self.broadcast({'event': 'sync_complete'}) + else: + _logger.debug('Postpone synchronization with %r session', + self._offline_session) + self.broadcast({'event': 'sync_paused'}) + + def __found_mountcb(self, path): + self._mounts.add(path) + if self._jobs: + _logger.debug('Found %r sync mount, pool it', path) + else: + _logger.debug('Found %r sync mount, start synchronization', path) + self._jobs.spawn(self._sync_mounts) + + def __lost_mount_cb(self, path): + if self._mounts.remove(path) == util.Pool.ACTIVE: + _logger.warning('%r was unmounted, break synchronization', path) + self._jobs.kill() diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py index a6657d8..f3729b6 100644 --- a/sugar_network/resources/volume.py +++ b/sugar_network/resources/volume.py @@ -162,11 +162,12 @@ class Volume(db.Volume): class Commands(object): def __init__(self): - self._notifier = coroutine.AsyncResult() - self.connect(lambda event: self._notify(event)) + self._pooler = _Pooler() - def connect(self, callback, condition=None, **kwargs): - raise NotImplementedError() + def broadcast(self, event): + _logger.debug('Publish event: %r', event) + self._pooler.notify_all(event) + coroutine.dispatch() @router.route('GET', '/robots.txt') def robots(self, request, response): @@ -185,46 +186,34 @@ class Commands(object): @db.volume_command(method='GET', cmd='subscribe', mime_type='application/json') - def subscribe(self, request=None, response=None, only_commits=False): - """Subscribe to Server-Sent Events. - - :param only_commits: - subscribers can be notified only with "commit" events; - that is useful to minimize interactions between server and clients - - """ + def subscribe(self, request=None, response=None, **condition): + """Subscribe to Server-Sent Events.""" + if request is not None and not condition: + condition = request.query if response is not None: response.content_type = 'text/event-stream' response['Cache-Control'] = 'no-cache' peer = 'anonymous' if hasattr(request, 'environ'): peer = request.environ.get('HTTP_SUGAR_USER') or peer - return self._pull_events(peer, only_commits) + return self._pull_events(peer, condition) - def _pull_events(self, peer, only_commits): + def _pull_events(self, peer, condition): _logger.debug('Start pulling events to %s user', peer) - - yield 'data: %s\n\n' % json.dumps({'event': 'handshake'}) try: while True: - event = self._notifier.get() - if only_commits: - if event['event'] != 'commit': - continue + event = self._pooler.wait() + for key, value in condition.items(): + if value.startswith('!'): + if event.get(key) == value[1:]: + break + elif event.get(key) != value: + break else: - if event['event'] == 'commit': - # Subscribers already got update notifications enough - continue - yield 'data: %s\n\n' % json.dumps(event) + yield 'data: %s\n\n' % json.dumps(event) finally: _logger.debug('Stop pulling events to %s user', peer) - def _notify(self, event): - _logger.debug('Publish event: %r', event) - self._notifier.set(event) - self._notifier = coroutine.AsyncResult() - coroutine.dispatch() - class VolumeCommands(db.VolumeCommands): @@ -315,6 +304,37 @@ class VolumeCommands(db.VolumeCommands): props[name] = url +class _Pooler(object): + """One-producer-to-many-consumers events delivery.""" + + def __init__(self): + self._value = None + self._waiters = 0 + self._ready = coroutine.Event() + self._open = coroutine.Event() + self._open.set() + + def wait(self): + self._open.wait() + self._waiters += 1 + try: + self._ready.wait() + finally: + self._waiters -= 1 + if self._waiters == 0: + self._ready.clear() + self._open.set() + return self._value + + def notify_all(self, value=None): + self._open.wait() + if not self._waiters: + return + self._open.clear() + self._value = value + self._ready.set() + + _HELLO_HTML = """\ <h2>Welcome to Sugar Network API!</h2> Consult <a href="http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network/API"> diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index ee6ba7f..caabc46 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -193,8 +193,8 @@ class Client(object): result = reply.raw return result - def subscribe(self): - return _Subscription(self, _RECONNECTION_NUMBER) + def subscribe(self, **condition): + return _Subscription(self, condition, _RECONNECTION_NUMBER) def _register(self): self.post(['user'], { @@ -214,10 +214,12 @@ class Client(object): class _Subscription(object): - def __init__(self, aclient, tries): + def __init__(self, aclient, condition, tries): self._tries = tries or 1 self._client = aclient - self._response = None + self._content = None + self._params = condition + self._params['cmd'] = 'subscribe' def __iter__(self): while True: @@ -241,7 +243,7 @@ class _Subscription(object): raise exception('Failed to read from %r subscription, ' 'will resubscribe', self._client.api_url) - self._response = None + self._content = None if line.startswith('data: '): try: @@ -251,15 +253,14 @@ class _Subscription(object): line, self._client.api_url) def _handshake(self): - if self._response is not None: - return self._response.raw + if self._content is not None: + return self._content _logger.debug('Subscribe to %r', self._client.api_url) for a_try in reversed(xrange(self._tries)): try: - self._response = self._client.request('GET', - params={'cmd': 'subscribe'}) + response = self._client.request('GET', params=self._params) break except Exception: if a_try == 0: @@ -269,7 +270,8 @@ class _Subscription(object): self._client.api_url, _RECONNECTION_TIMEOUT) coroutine.sleep(_RECONNECTION_TIMEOUT) - return self._response.raw + self._content = response.raw + return self._content def _sign(privkey_path, data): diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index afb9c60..657621e 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -336,18 +336,22 @@ class _Request(Request): self.accept_language = _parse_accept_language( environ.get('HTTP_ACCEPT_LANGUAGE')) self.principal = None + self.query = {} enforce('..' not in self.path, 'Relative url path') query = environ.get('QUERY_STRING') or '' for attr, value in parse_qsl(query): + attr = str(attr) param = self.get(attr) if type(param) is list: param.append(value) - elif param is not None: - self[str(attr)] = [param, value] else: - self[str(attr)] = value + if param is not None: + value = [param, value] + self[attr] = value + if attr != 'cmd': + self.query[attr] = value if query: self.url += '?' + query diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py index 728a4ed..8171950 100644 --- a/sugar_network/toolkit/util.py +++ b/sugar_network/toolkit/util.py @@ -533,50 +533,53 @@ class PersistentSequence(Sequence): os.fsync(f.fileno()) -class MutableStack(object): +class Pool(object): """Stack that keeps its iterators correct after changing content.""" + QUEUED = 0 + ACTIVE = 1 + PASSED = 2 + def __init__(self): self._queue = collections.deque() def add(self, value): self.remove(value) - self._queue.appendleft([False, value]) + self._queue.appendleft([Pool.QUEUED, value]) def remove(self, value): - for i, (__, existing) in enumerate(self._queue): + for i, (state, existing) in enumerate(self._queue): if existing == value: del self._queue[i] - break + return state + + def get_state(self, value): + for state, existing in self._queue: + if existing == value: + return state def rewind(self): for i in self._queue: - i[0] = False + i[0] = Pool.QUEUED def __len__(self): return len(self._queue) def __iter__(self): - return _MutableStackIterator(self._queue) + for i in self._queue: + state, value = i + if state == Pool.PASSED: + continue + try: + i[0] = Pool.ACTIVE + yield value + finally: + i[0] = Pool.PASSED def __repr__(self): return str([i[1] for i in self._queue]) -class _MutableStackIterator(object): - - def __init__(self, queue): - self._queue = queue - - def next(self): - for i in self._queue: - processed, value = i - if not processed: - i[0] = True - return value - raise StopIteration() - - class _NullHandler(logging.Handler): def emit(self, record): diff --git a/tests/__init__.py b/tests/__init__.py index c62661a..74811a5 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -83,7 +83,7 @@ class Test(unittest.TestCase): node.data_root.value = tmpdir node.static_url.value = None node.files_root.value = None - node.layers.value = None + node.sync_layers.value = None db.index_write_queue.value = 10 client.local_root.value = tmpdir client.activity_dirs.value = [tmpdir + '/Activities'] diff --git a/tests/integration/master_slave.py b/tests/integration/master_slave.py index 9d35ed8..0b1fb87 100755 --- a/tests/integration/master_slave.py +++ b/tests/integration/master_slave.py @@ -45,7 +45,7 @@ class MasterSlaveTest(tests.Test): '-DDD', '--rundir=slave/run', '--files-root=slave/files', '--stats-root=slave/stats', '--stats-user', '--stats-user-step=1', '--stats-user-rras=RRA:AVERAGE:0.5:1:100', - '--index-flush-threshold=1', '--layers=pilot', + '--index-flush-threshold=1', '--sync-layers=pilot', ]) coroutine.sleep(2) @@ -71,6 +71,7 @@ class MasterSlaveTest(tests.Test): 'summary': 'summary', 'description': 'description', 'preview': 'preview1', + 'layer': 'pilot', }) self.touch(('master/files/file1', 'file1')) @@ -80,6 +81,7 @@ class MasterSlaveTest(tests.Test): 'summary': 'summary', 'description': 'description', 'preview': 'preview2', + 'layer': 'pilot', }) slave.post(['user', tests.UID], { 'name': 'db', @@ -119,6 +121,7 @@ class MasterSlaveTest(tests.Test): 'summary': 'summary', 'description': 'description', 'preview': 'preview3', + 'layer': 'pilot', }) master.put(['context', context1, 'title'], 'title1_') master.put(['context', context2, 'preview'], 'preview2_') @@ -131,6 +134,7 @@ class MasterSlaveTest(tests.Test): 'summary': 'summary', 'description': 'description', 'preview': 'preview4', + 'layer': 'pilot', }) slave.put(['context', context2, 'title'], 'title2_') slave.put(['context', context1, 'preview'], 'preview1_') diff --git a/tests/units/client/home_mount.py b/tests/units/client/home_mount.py index 1014756..6735311 100755 --- a/tests/units/client/home_mount.py +++ b/tests/units/client/home_mount.py @@ -164,7 +164,7 @@ class HomeMountTest(tests.Test): events = [] def read_events(): - for event in local.subscribe(): + for event in local.subscribe(event='!commit'): if 'props' in event: event.pop('props') events.append(event) @@ -186,7 +186,6 @@ class HomeMountTest(tests.Test): job.kill() self.assertEqual([ - {'event': 'handshake'}, {'guid': guid, 'document': 'context', 'event': 'create', 'mountpoint': '~'}, {'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~'}, {'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '~'}, @@ -206,7 +205,7 @@ class HomeMountTest(tests.Test): }) def read_events(): - for event in local.subscribe(): + for event in IPCClient().subscribe(event='!commit'): events.append(event) job = coroutine.spawn(read_events) coroutine.sleep(.1) @@ -218,7 +217,6 @@ class HomeMountTest(tests.Test): job.kill() self.assertEqual([ - {'event': 'handshake'}, {'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~', 'props': {'title': 'title_2'}}, {'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '/', 'props': {'favorite': True}}, {'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '~', 'props': {'favorite': True}}, diff --git a/tests/units/client/mountset.py b/tests/units/client/mountset.py index 0fe34f4..5e4eda3 100755 --- a/tests/units/client/mountset.py +++ b/tests/units/client/mountset.py @@ -55,8 +55,7 @@ class MountsetTest(tests.Test): if 'props' in event: event.pop('props') self.events.append(event) - if event['event'] != 'handshake': - self.mounted.set() + self.mounted.set() coroutine.dispatch() self.events_job = coroutine.spawn(read_events) @@ -77,7 +76,6 @@ class MountsetTest(tests.Test): self.assertEqual( sorted([ - {'event': 'handshake'}, {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'}, ]), @@ -100,7 +98,6 @@ class MountsetTest(tests.Test): coroutine.sleep(.5) self.assertEqual([ - {'event': 'handshake'}, {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, ], self.events) @@ -116,7 +113,6 @@ class MountsetTest(tests.Test): self.mounted.clear() self.assertEqual([ - {'event': 'handshake'}, {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'}, ], @@ -169,7 +165,6 @@ class MountsetTest(tests.Test): self.mounted.clear() self.assertEqual([ - {'event': 'handshake'}, {'mountpoint': tests.tmpdir + '/mnt', 'event': 'mount', 'private': False, 'name': 'mnt'}, ], self.events) diff --git a/tests/units/client/node_mount.py b/tests/units/client/node_mount.py index e7bdcda..349c6e3 100755 --- a/tests/units/client/node_mount.py +++ b/tests/units/client/node_mount.py @@ -65,8 +65,7 @@ class NodeMountTest(tests.Test): if 'props' in event: event.pop('props') events.append(event) - if event['event'] != 'handshake': - got_event.set() + got_event.set() job = coroutine.spawn(read_events) guid = remote.post(['context'], { @@ -78,7 +77,6 @@ class NodeMountTest(tests.Test): got_event.wait() got_event.clear() self.assertEqual([ - {'event': 'handshake'}, {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1}, ], events) diff --git a/tests/units/client/remote_mount.py b/tests/units/client/remote_mount.py index bd754e1..9813f94 100755 --- a/tests/units/client/remote_mount.py +++ b/tests/units/client/remote_mount.py @@ -35,7 +35,7 @@ class RemoteMountTest(tests.Test): events = [] def read_events(): - for event in remote.subscribe(): + for event in remote.subscribe(event='!commit'): if 'props' in event: event.pop('props') events.append(event) @@ -57,8 +57,6 @@ class RemoteMountTest(tests.Test): job.kill() self.assertEqual([ - {'event': 'handshake'}, - {'event': 'handshake', 'mountpoint': '/'}, {'guid': guid, 'document': 'context', 'event': 'create', 'mountpoint': '/'}, {'guid': guid, 'document': 'context', 'event': 'update', 'mountpoint': '/'}, {'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '/'}, @@ -92,9 +90,7 @@ class RemoteMountTest(tests.Test): self.assertEqual(True, remote.get(cmd='mounted')) self.assertEqual([ - {'event': 'handshake'}, {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, - {'event': 'handshake', 'mountpoint': '/'}, ], events) del events[:] @@ -117,7 +113,6 @@ class RemoteMountTest(tests.Test): self.assertEqual(True, remote.get(cmd='mounted')) self.assertEqual([ {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, - {'event': 'handshake', 'mountpoint': '/'}, ], events) del events[:] diff --git a/tests/units/node/sync_offline.py b/tests/units/node/sync_offline.py index cf8091d..f6b8ce3 100755 --- a/tests/units/node/sync_offline.py +++ b/tests/units/node/sync_offline.py @@ -36,7 +36,7 @@ class SyncOfflineTest(tests.Test): statvfs.f_bfree = 999999999 stats_user.stats_user_step.value = 1 stats_user.stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100'] - node.layers.value = 'pilot' + node.sync_layers.value = 'pilot' def next_uuid(self): self.uuid += 1 @@ -50,13 +50,13 @@ class SyncOfflineTest(tests.Test): volume = Volume('node', [Document]) cp = SlaveCommands('node', volume) - node.layers.value = None + node.sync_layers.value = None self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt') - node.layers.value = 'public' + node.sync_layers.value = 'public' self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt') - node.layers.value = ['public'] + node.sync_layers.value = ['public'] self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt') - node.layers.value = ['public', 'foo'] + node.sync_layers.value = ['public', 'foo'] self.assertRaises(RuntimeError, cp.offline_sync, tests.tmpdir + '/mnt') def test_Export(self): diff --git a/tests/units/resources/volume.py b/tests/units/resources/volume.py index 0ea3fb4..afc725f 100755 --- a/tests/units/resources/volume.py +++ b/tests/units/resources/volume.py @@ -48,7 +48,7 @@ class VolumeTest(tests.Test): events = [] def read_events(): - for event in cp.subscribe(Request(), db.Response()): + for event in cp.subscribe(event='!commit'): if not event.strip(): continue assert event.startswith('data: ') @@ -71,14 +71,13 @@ class VolumeTest(tests.Test): job.kill() self.assertEqual([ - {'event': 'handshake'}, {'guid': 'guid', 'document': 'document', 'event': 'create'}, {'guid': 'guid', 'document': 'document', 'event': 'update'}, {'guid': 'guid', 'event': 'delete', 'document': u'document'}, ], events) - def test_SubscribeToOnlyCommits(self): + def __test_SubscribeCondition(self): class Document(Resource): @@ -114,7 +113,6 @@ class VolumeTest(tests.Test): job.kill() self.assertEqual([ - {'event': 'handshake'}, {'document': 'document', 'event': 'commit'}, {'document': 'document', 'event': 'commit'}, {'document': 'document', 'event': 'commit'}, @@ -498,9 +496,7 @@ class TestCommands(VolumeCommands, Commands): def __init__(self, volume): VolumeCommands.__init__(self, volume) Commands.__init__(self) - - def connect(self, callback, condition=None, **kwargs): - self.volume.connect(callback, condition) + self.volume.connect(self.broadcast) def call(cp, principal=None, content=None, **kwargs): diff --git a/tests/units/toolkit/util.py b/tests/units/toolkit/util.py index 44bb9e4..ada713d 100755 --- a/tests/units/toolkit/util.py +++ b/tests/units/toolkit/util.py @@ -363,6 +363,64 @@ class UtilTest(tests.Test): self.assertEqual(['\n', 'b'], readlines('\nb')) self.assertEqual([' \n', ' b \n'], readlines(' \n b \n')) + def test_Pool(self): + stack = util.Pool() + + stack.add('a') + stack.add('b') + stack.add('c') + + self.assertEqual(util.Pool.QUEUED, stack.get_state('a')) + self.assertEqual(util.Pool.QUEUED, stack.get_state('b')) + self.assertEqual(util.Pool.QUEUED, stack.get_state('c')) + self.assertEqual( + [('c', util.Pool.ACTIVE), ('b', util.Pool.ACTIVE), ('a', util.Pool.ACTIVE)], + [(i, stack.get_state(i)) for i in stack]) + self.assertEqual( + [], + [i for i in stack]) + self.assertEqual(util.Pool.PASSED, stack.get_state('a')) + self.assertEqual(util.Pool.PASSED, stack.get_state('b')) + self.assertEqual(util.Pool.PASSED, stack.get_state('c')) + + stack.rewind() + self.assertEqual(util.Pool.QUEUED, stack.get_state('a')) + self.assertEqual(util.Pool.QUEUED, stack.get_state('b')) + self.assertEqual(util.Pool.QUEUED, stack.get_state('c')) + self.assertEqual( + ['c', 'b', 'a'], + [i for i in stack]) + + stack.add('c') + self.assertEqual(util.Pool.QUEUED, stack.get_state('c')) + self.assertEqual( + [('c', util.Pool.ACTIVE)], + [(i, stack.get_state(i)) for i in stack]) + self.assertEqual(util.Pool.PASSED, stack.get_state('c')) + + stack.add('b') + stack.add('a') + self.assertEqual( + ['a', 'b'], + [i for i in stack]) + + stack.rewind() + self.assertEqual( + ['a', 'b', 'c'], + [i for i in stack]) + + stack.add('d') + self.assertEqual(util.Pool.QUEUED, stack.get_state('d')) + self.assertEqual( + [('d', util.Pool.ACTIVE)], + [(i, stack.get_state(i)) for i in stack]) + self.assertEqual(util.Pool.PASSED, stack.get_state('d')) + + stack.rewind() + self.assertEqual( + ['d', 'a', 'b', 'c'], + [i for i in stack]) + if __name__ == '__main__': tests.main() |