diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-09-17 11:01:46 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-09-17 11:01:46 (GMT) |
commit | 039f54130fdd472b0a40c834f67e2fc7248ad564 (patch) | |
tree | e49bacc21ccaa0e1e4f752a03e4d3e1c1bcc9fe9 | |
parent | 105b730aa27977769375e6c09c50d7141eae0df2 (diff) |
Polish design
* only RESTful and DBus API for IPC;
* use SSE for subscriptions
45 files changed, 1322 insertions, 3148 deletions
diff --git a/examples/activities.py b/examples/activities.py deleted file mode 100755 index 875728c..0000000 --- a/examples/activities.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python - -import sugar_network - - -def main(): - client = sugar_network.Client('/') - client.checkin('com.ywwg.CartoonBuilderActivity') - - -if __name__ == '__main__': - import os - from sugar_network import local - - os.system('sugar-network-service -DD start ' - '--local-root=tmp ' - '--activity-dirs=tmp/Activities ' - '--api-url=http://localhost:8000') - try: - local.local_root.value = 'tmp' - local.api_url.value = 'http://localhost:8000' - local.activity_dirs.value = ['tmp/Activities'] - main() - finally: - os.system('sugar-network-service --local-root=tmp stop') diff --git a/examples/client.py b/examples/client.py deleted file mode 100755 index 895306a..0000000 --- a/examples/client.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python - -import sugar_network - - -def main(): - client = sugar_network.Client('~') - - guids = [None] * 3 - titles = ['Title1', 'Title2', 'Title3'] - - print '-- Delete objects' - for guid in [i['guid'] for i in client.Context.cursor()]: - client.Context.delete(guid) - - def context_new(title): - context = client.Context() - context['type'] = 'activity' - context['title'] = title - context['summary'] = 'Description' - context['description'] = 'Description' - context.post() - return context['guid'] - - print '-- Create new objects' - guids[0] = context_new(titles[0]) - assert guids[0] - guids[1] = context_new(titles[1]) - assert guids[1] and guids[1] != guids[0] - guids[2] = context_new(titles[2]) - assert guids[2] and guids[2] != guids[1] and guids[2] != guids[0] - - print '-- Browse using iterators' - for i, obj in enumerate(client.Context.cursor()): - assert i == obj.offset - assert obj['guid'] == guids[i] - - print '-- Browse by offset' - query = client.Context.cursor() - for i in range(query.total): - assert query[i]['guid'] == guids[i] - - print '-- Get objects directly' - assert client.Context(guids[0], reply=['title'])['title'] == titles[0] - assert client.Context(guids[1], reply=['title'])['title'] == titles[1] - assert client.Context(guids[2], reply=['title'])['title'] == titles[2] - - print '-- Set BLOB property' - client.Context(guids[1]).set_blob('icon', 'string') - - print '-- Get BLOB property' - assert client.Context(guids[1]).get_blob('icon').read() == 'string' - - print '-- Query by property value' - for obj in client.Context.cursor(title='Title2', reply=['guid', 'title']): - assert obj['guid'] == guids[1] - assert obj['title'] == titles[1] - - print '-- Full text search query' - query = client.Context.cursor('Title1 OR Title3', reply=['guid', 'title']) - assert query.total == 2 - - assert sorted([(guids[0], titles[0]), (guids[2], titles[2])]) == \ - sorted([(i['guid'], i['title']) for i in query]) - - -if __name__ == '__main__': - import os - from sugar_network import local - - os.system('sugar-network-service -DD start ' - '--local-root=tmp ' - '--activity-dirs=tmp/Activities ' - '--api-url=http://localhost:8000') - try: - local.local_root.value = 'tmp' - main() - finally: - os.system('sugar-network-service --local-root=tmp stop') diff --git a/sugar-network-server b/sugar-network-server index 1878faf..847f716 100755 --- a/sugar-network-server +++ b/sugar-network-server @@ -24,16 +24,15 @@ import active_document as ad import sugar_network_webui as webui from active_toolkit import coroutine, application from active_toolkit.options import Option -from sugar_network import node, local, Client +from sugar_network import node, local from sugar_network.local.mounts import LocalMount from sugar_network.local.mountset import Mountset from sugar_network.local.mounts import LocalMount from sugar_network.node import stats -from sugar_network.node.router import Router from sugar_network.node.commands import MasterCommands -from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.resources.volume import Volume from sugar_network.toolkit import sugar, sneakernet +from sugar_network.toolkit.router import Router class Application(application.Daemon): @@ -53,16 +52,13 @@ class Application(application.Daemon): volume = Volume(node.data_root.value) self.jobs.spawn(volume.populate) - subscriber = SubscribeSocket(volume, - node.host.value, node.subscribe_port.value) - cp = MasterCommands(volume, subscriber) + cp = MasterCommands(volume) logging.info('Listening for requests on %s:%s', node.host.value, node.port.value) server = coroutine.WSGIServer((node.host.value, node.port.value), Router(cp), **ssl_args) self.jobs.spawn(server.serve_forever) - self.jobs.spawn(subscriber.serve_forever) if webui.webui.value: # XXX Until implementing regular web users @@ -74,11 +70,10 @@ class Application(application.Daemon): local.mounts_root.value = None mountset = Mountset(None) mountset['/'] = LocalMount(volume) - Client.connection = mountset host = (webui.webui_host.value, webui.webui_port.value) logging.info('Start Web server on %s:%s port', *host) - server = coroutine.WSGIServer(host, webui.get_app()) + server = coroutine.WSGIServer(host, webui.get_app(mountset)) self.jobs.spawn(server.serve_forever) try: diff --git a/sugar-network-service b/sugar-network-service index 7bb8d23..d461457 100755 --- a/sugar-network-service +++ b/sugar-network-service @@ -29,15 +29,15 @@ from os.path import join, abspath, exists import active_document as ad import sugar_network_webui as webui -from sugar_network import toolkit, local, node, Client +from sugar_network import toolkit, local, node from sugar_network.toolkit import sugar, sneakernet from sugar_network.toolkit import dbus_thread, mounts_monitor from sugar_network.local import activities, datastore from sugar_network.local.dbus_datastore import Datastore from sugar_network.local.dbus_network import Network -from sugar_network.local.bus import IPCServer from sugar_network.local.mounts import HomeMount, RemoteMount from sugar_network.local.mountset import Mountset +from sugar_network.local.ipc_client import Router from sugar_network.node import stats from sugar_network.resources.volume import Volume from active_toolkit.options import Option @@ -154,72 +154,51 @@ class Application(application.Application): @application.command( 'start service and log to files', name='start', keep_stdout=True) - def _start(self, minimal=False): + def _start(self): if self.check_for_instance(): printf.info('%s is already run', self.name) exit(1) - if local.anonymous.value: - sugar.uid = lambda: 'anonymous' - sugar.nickname = lambda: 'anonymous' - sugar.color = lambda: '#000000,#000000' - else: - # In case if it is new Sugar Shell profile - toolkit.ensure_dsa_pubkey(sugar.profile_path('owner.key')) - jobs = coroutine.Pool() - - volume = Volume(self._db_path, lazy_open=local.lazy_open.value) - mountset = Mountset(volume) - mountset['~'] = HomeMount(volume) - mountset['/'] = RemoteMount(volume) - - # Point this process code to `mountset` directly passing over IPC - Client.connection = mountset + mountset = self._mountset() def delayed_start(event=None): logging.info('Proceed delayed start') mountset.disconnect(delayed_start) - volume.populate() - datastore.populate(volume['artifact']) + mountset.home_volume.populate() + datastore.populate(mountset.home_volume['artifact']) + self._sync(mountset.home_volume) - stale_keep_impls = [] - Context = Client('~').Context - for context in Context.cursor(keep_impl=[1, 2]): - if not activities.ensure_checkins(context.guid): - stale_keep_impls.append(context.guid) - for guid in stale_keep_impls: - Context(guid, keep_impl=0).post() + logging.info('Listening for IPC requests on %s port', + local.ipc_port.value) + server = coroutine.WSGIServer(('localhost', local.ipc_port.value), + Router(mountset)) + jobs.spawn(server.serve_forever) - jobs.spawn(activities.monitor, volume['context'], + jobs.spawn(activities.monitor, mountset.home_volume['context'], local.activity_dirs.value) if webui.webui.value: host = (webui.webui_host.value, webui.webui_port.value) logging.info('Start Web server on %s:%s', *host) - server = coroutine.WSGIServer(host, webui.get_app()) + server = coroutine.WSGIServer(host, webui.get_app(mountset)) jobs.spawn(server.serve_forever) if local.mounts_root.value: mounts_monitor.start(abspath(local.mounts_root.value)) - if not minimal: - if local.delayed_start.value: - mountset.connect(delayed_start, event='delayed-start') - else: - delayed_start() - dbus_thread.spawn_service(Datastore) - dbus_thread.spawn_service(Network) + if local.delayed_start.value: + mountset.connect(delayed_start, event='delayed-start') + else: + delayed_start() + dbus_thread.spawn_service(Datastore) + dbus_thread.spawn_service(Network) pid_path = self.new_instance() try: mountset.open() - jobs.spawn(IPCServer(mountset).serve_forever) - if minimal: - jobs.join() - else: - dbus_thread.start(mountset) + dbus_thread.start(mountset) except KeyboardInterrupt: util.exception('%s interrupted', self.name) except Exception: @@ -230,6 +209,22 @@ class Application(application.Application): mountset.close() os.unlink(pid_path) + def _mountset(self): + if local.anonymous.value: + sugar.uid = lambda: 'anonymous' + sugar.nickname = lambda: 'anonymous' + sugar.color = lambda: '#000000,#000000' + else: + # In case if it is new Sugar Shell profile + toolkit.ensure_dsa_pubkey(sugar.profile_path('owner.key')) + + volume = Volume(self._db_path, lazy_open=local.lazy_open.value) + mountset = Mountset(volume) + mountset['~'] = HomeMount(volume) + mountset['/'] = RemoteMount(volume) + + return mountset + @contextmanager def _rendezvous(self): @@ -248,21 +243,26 @@ class Application(application.Application): self._events['sync_complete'] = coroutine.Event() - server = None + pid_path = None + mountset = None try: if not self.check_for_instance(): printf.info('%s is not started, ' \ 'launch it for this command only', self.name) - server = coroutine.spawn(self._start, True) + pid_path = self.new_instance() + mountset = self._mountset() + dbus_thread.spawn_service(Network) + coroutine.spawn(dbus_thread.start, mountset) coroutine.dispatch() - Client.connection.opened.wait() + mountset.opened.wait() Client.connect(events_cb) yield finally: - if server is not None: - server.kill() + if mountset is not None: + mountset.close() + os.unlink(pid_path) def _call(self, method, content=None): kwargs = {} @@ -300,6 +300,12 @@ class Application(application.Application): raise break + def _sync(self, volume): + docs, __ = volume['context'].find(limit=ad.MAX_LIMIT, keep_impl=[1, 2]) + for context in docs: + if not activities.ensure_checkins(context.guid): + contexts.update(context.guid, keep_impl=0) + locale.setlocale(locale.LC_ALL, '') @@ -314,7 +320,7 @@ local.tmpdir.value = sugar.profile_path('tmp') Option.seek('main', [application.debug]) Option.seek('webui', webui) Option.seek('local', local) -Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs]) +Option.seek('node', [node.port, node.sync_dirs]) Option.seek('stats', stats) Option.seek('active-document', ad) diff --git a/sugar_network/__init__.py b/sugar_network/__init__.py index de95e09..7ace3f2 100644 --- a/sugar_network/__init__.py +++ b/sugar_network/__init__.py @@ -14,14 +14,26 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. from sugar_network.toolkit import sugar -from sugar_network.client.bus import Client, ServerError from sugar_network.local.activities import checkins from sugar_network.local import api_url, server_mode from sugar_network_webui import webui_port from sugar_network.zerosugar.injector import launch, checkin +def Client(url=None, **kwargs): + from sugar_network.toolkit import http + if url is None: + url = api_url.value + return http.Client(url, **kwargs) + + +def IPCClient(**kwargs): + from sugar_network.toolkit import http + from sugar_network.local import ipc_port + return http.Client('http://localhost:%s' % ipc_port.value, **kwargs) + + def DBusClient(*args, **kwargs): # Avoid importing dbus related modules by default - from sugar_network.client import dbus_client + from sugar_network.local import dbus_client return dbus_client.DBusClient(*args, **kwargs) diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py deleted file mode 100644 index ee29047..0000000 --- a/sugar_network/client/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/sugar_network/client/bus.py b/sugar_network/client/bus.py deleted file mode 100644 index 2fb4b68..0000000 --- a/sugar_network/client/bus.py +++ /dev/null @@ -1,303 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import json -import socket -import logging -from contextlib import contextmanager - -import active_document as ad -from active_toolkit import util, coroutine, sockets -from sugar_network import local -from sugar_network.toolkit import ipc, sugar - - -_CONNECTION_POOL = 6 - -_logger = logging.getLogger('sugar_network') - - -class ServerError(RuntimeError): - pass - - -class classproperty(property): - - def __get__(self, obj, type_): - # pylint: disable-msg=E1101 - return self.fget.__get__(None, type_)() - - -class Client(object): - """IPC class to get access from a client side. - - See http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network/Client - for detailed information. - - """ - _connection = None - - @classproperty - @classmethod - def connection(cls): - if cls._connection is None: - cls._connection = _Connection() - return cls._connection - - @classmethod - def close(cls): - cls._connection = None - - @classmethod - def call(cls, method, cmd=None, content=None, - content_type='application/json', **kwargs): - request = ad.Request(kwargs) - request.access_level = ad.ACCESS_LOCAL - request.principal = sugar.uid() - request['method'] = method - if cmd: - request['cmd'] = cmd - request.content = content - request.content_type = content_type - return cls.connection.call(request) - - @classmethod - def publish(cls, event, **kwargs): - kwargs['event'] = event - cls.connection.publish(kwargs) - - @classmethod - def connect(cls, callback, **condition): - cls.connection.connect(callback, condition) - - @classmethod - def disconnect(cls, callback): - cls.connection.disconnect(callback) - - @classmethod - def subscribe(cls): - """Start subscription session. - - :returns: - `SocketFile` object connected to IPC server to read events from - - """ - ipc.rendezvous() - # pylint: disable-msg=E1101 - conn = sockets.SocketFile(coroutine.socket(socket.AF_UNIX)) - conn.connect(local.ensure_path('run', 'subscribe')) - return conn - - @classmethod - def mounts(cls): - return cls.call('GET', 'mounts') - - def __init__(self, mountpoint): - self._mountpoint = mountpoint - self._resources = {} - - @property - def connected(self): - _logger.warning('Client.connected is depecated, ' - 'use Client.mounted instead') - return self.call('GET', 'mounted', mountpoint=self._mountpoint) - - @property - def mounted(self): - return self.call('GET', 'mounted', mountpoint=self._mountpoint) - - def launch(self, context, command='activity', object_id=None, uri=None, - args=None): - """Launch context implementation. - - Function will call fork at the beginning. In forked process, - it will try to choose proper implementation to execute and launch it. - - Execution log will be stored in `~/.sugar/PROFILE/logs` directory. - - :param context: - context GUID to look for implementations - :param command: - command that selected implementation should support - :param object_id: - optional id to restore Journal object - :param uri: - optional uri to open; if implementation supports it - :param args: - optional list of arguments to pass to launching implementation - - """ - # TODO Make a diference in launching from "~" and "/" mounts - self.publish('launch', mountpoint=self._mountpoint, context=context, - command=command, object_id=object_id, uri=uri, args=args) - - def __getattr__(self, name): - """Class-like object to access to a resource or call a method. - - :param name: - resource name started with capital char - :returns: - a class-like resource object - - """ - resource = self._resources.get(name) - if resource is None: - resource = _Resource(self._mountpoint, name.lower()) - self._resources[name] = resource - return resource - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - -class _Connection(object): - - def __init__(self): - self._pool = coroutine.Queue(maxsize=_CONNECTION_POOL) - self._pool_size = 0 - self._subscribe_job = None - self._subscriptions = {} - - def call(self, request, response=None): - with self._pipe() as pipe: - request['content_type'] = request.content_type - pipe.write_message(request) - if request.content_type == 'application/json': - request.content = json.dumps(request.content) - pipe.write(request.content) - reply = pipe.read_message() - - _logger.debug('Made a call: request=%r reply=%r', request, reply) - - if type(reply) is dict and 'error' in reply: - raise ServerError(reply['error']) - - return reply - - def connect(self, callback, condition=None): - if self._subscribe_job is None: - self._subscribe_job = coroutine.spawn(self._subscribe) - self._subscriptions[callback] = condition or {} - - def disconnect(self, callback): - if callback in self._subscriptions: - del self._subscriptions[callback] - - def publish(self, event): - request = ad.Request() - request['method'] = 'POST' - request['cmd'] = 'publish' - request.content = event - request.content_type = 'application/json' - self.call(request) - - def close(self): - if self._subscribe_job is not None: - _logger.debug('Stop waiting for events') - self._subscribe_job.kill() - self._subscribe_job = None - - while not self._pool.empty(): - conn = self._pool.get_nowait() - try: - _logger.debug('Close IPC connection: %r', conn) - conn.close() - except Exception: - util.exception(_logger, 'Cannot close IPC connection') - - @contextmanager - def _pipe(self): - if self._pool.qsize() or self._pool_size >= self._pool.maxsize: - conn = self._pool.get() - else: - self._pool_size += 1 - ipc.rendezvous() - # pylint: disable-msg=E1101 - conn = coroutine.socket(socket.AF_UNIX) - conn.connect(local.ensure_path('run', 'accept')) - _logger.debug('Open new IPC connection: %r', conn) - try: - yield sockets.SocketFile(conn) - finally: - self._pool.put(conn) - - def _subscribe(self): - _logger.debug('Start waiting for events') - - conn = Client.subscribe() - try: - while True: - coroutine.select([conn.fileno()], [], []) - event = conn.read_message() - if event is None: - break - for callback, condition in self._subscriptions.items(): - for key, value in condition.items(): - if event.get(key) != value: - break - else: - callback(event) - finally: - conn.close() - - -class _Resource(object): - - def __init__(self, mountpoint, name): - self.mountpoint = mountpoint - self.document = name - - def cursor(self, query=None, order_by=None, reply=None, page_size=18, - **filters): - """Query resource objects. - - :param query: - full text search query string in Xapian format - :param order_by: - name of property to sort by; might be prefixed by either `+` or `-` - to change order's direction - :param reply: - list of property names to return for found objects; - by default, only GUIDs will be returned; for missed properties, - will be sent additional requests to a server on getting access - to particular object. - :param page_size: - number of items in one cached page, there are might be several - (at least two) pages - :param filters: - a dictionary of properties to filter resulting list - - """ - from sugar_network.client.cursor import Cursor - return Cursor(self.mountpoint, self.document, query, order_by, reply, - page_size, **filters) - - def delete(self, guid): - """Delete resource object. - - :param guid: - resource object's GUID - - """ - return Client.call('DELETE', - mountpoint=self.mountpoint, document=self.document, guid=guid) - - def __call__(self, guid=None, reply=None, **kwargs): - from sugar_network.client.objects import Object - return Object(self.mountpoint, self.document, reply or [], guid, - **kwargs) diff --git a/sugar_network/client/cursor.py b/sugar_network/client/cursor.py deleted file mode 100644 index 311a117..0000000 --- a/sugar_network/client/cursor.py +++ /dev/null @@ -1,293 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import logging -import collections - -from sugar_network.client.bus import Client -from sugar_network.client.objects import Object -from active_toolkit import coroutine, util, enforce - - -_QUERY_PAGES_NUMBER = 2 - - -_logger = logging.getLogger('sugar_network.cursor') - - -class Cursor(object): - - def __init__(self, mountpoint, document, query, order_by, reply, page_size, - **filters): - self.mountpoint = mountpoint - self.document = document - self._query = query - self._order_by = order_by - self._reply = reply or ['guid'] - if 'guid' not in self._reply: - self._reply.append('guid') - self._page_size = page_size - self._filters = filters or {} - self._total = None - self._page_access = collections.deque([], _QUERY_PAGES_NUMBER) - self._pages = {} - self._offset = -1 - self._wait_session = None - - Client.connect(self.__event_cb) - - def close(self): - Client.disconnect(self.__event_cb) - - # pylint: disable-msg=E1101,E0102,E0202 - @property - def offset(self): - """Current position in query results.""" - return self._offset - - @offset.setter - def offset(self, value): - """Change current position in query results.""" - self._offset = max(-1, value) - - @property - def total(self): - """Total number of objects.""" - if self._total is None: - if not self._fetch_page(0): - return 0 - return self._total - - @property - def order_by(self): - """Current order of resulting list. - - Name of property to sort by. Might be prefixed by either `+` or `-` - to change order's direction. - - """ - return self._order_by - - # pylint: disable-msg=E1101,E0102 - @order_by.setter - def order_by(self, value): - if self._order_by == value: - return - self._order_by = value - self._reset() - - def read_events(self): - if self._wait_session is None: - self._wait_session = _WaitSession() - - with self._wait_session as session: - while session.wait(): - for event in session: - if event['event'] == 'commit': - # TODO If cursor formed by fulltext query, - # it should refreshed as well - continue - # TODO Replace by changed offset - self._reset() - yield None - - def __iter__(self): - while self.offset + 1 < self.total: - self.offset += 1 - obj = self.get(self.offset) - if obj is None: - break - yield obj - self.offset = -1 - - def filter(self, query=None, **filters): - """Change query parameters. - - :param query: - full text search query string in Xapian format - :param filters: - a dictionary of properties to filter resulting list - - """ - if query == self._query and filters == self._filters: - return - self._query = query - self._filters = filters - self._reset() - - def update_filter(self, query=None, **filters): - """Update query parameters applying them on top of existing. - - :param query: - full text search query string in Xapian format - :param filters: - a dictionary of properties to filter resulting list - - """ - new_filters = self._filters.copy() - new_filters.update(filters) - self.filter(query, **new_filters) - - def get(self, key, default=None): - """Get either object by key or default value. - - :param key: - `key` value might be an `int` value (offset within the cursor), - or a string to treat it as GUID - :param default: - value to return if key if not found - :returns: - `Object` value or `default` - - """ - if type(key) is not int: - for page in self._pages.values(): - for obj in page: - if obj is not None and obj.guid == key: - return obj - return Object(self.mountpoint, self.document, self._reply, key) - else: - offset = key - - if offset < 0 or self._total is not None and \ - (offset >= self._total): - return default - - page = offset / self._page_size - offset -= page * self._page_size - - if page not in self._pages: - if not self._fetch_page(page): - return default - - if offset >= len(self._pages[page]): - total = page + len(self._pages[page]) - _logger.warning('Miscalculated total number, %s instead of %s', - total, self._total) - self._total = total - return default - - return self._pages[page][offset] - - def __getitem__(self, key): - """Get object by key. - - :param key: - `key` value might be an `int` value (offset within the cursor), - or a string to treat it as GUID - :returns: - `Object` value or raise `KeyError` exception if key is not found - - """ - result = self.get(key) - enforce(result is not None, KeyError, 'Key is out of range') - return result - - def _fetch_page(self, page): - offset = page * self._page_size - - params = { - 'mountpoint': self.mountpoint, - 'document': self.document, - } - for key, value in self._filters.items(): - if value is not None: - params[key] = value - params['offset'] = offset - params['limit'] = self._page_size - if self._query: - params['query'] = self._query - if self._order_by: - params['order_by'] = self._order_by - if self._reply: - params['reply'] = self._reply - - try: - response = Client.call('GET', **params) - self._total = response['total'] - except Exception: - util.exception(_logger, 'Failed to fetch %r query', params) - self._total = None - return False - - result = [None] * len(response['result']) - for i, props in enumerate(response['result']): - result[i] = Object(self.mountpoint, self.document, self._reply, - props['guid'], props, offset + i) - - if not self._page_access or self._page_access[-1] != page: - if len(self._page_access) == _QUERY_PAGES_NUMBER: - del self._pages[self._page_access[0]] - self._page_access.append(page) - self._pages[page] = result - - return True - - def _reset(self): - self._page_access.clear() - self._pages.clear() - self._total = None - - def __event_cb(self, event): - mountpoint = event.get('mountpoint') - document = event.get('document') - if document == self.document and mountpoint == self.mountpoint or \ - document == self.document and not mountpoint or \ - mountpoint == self.mountpoint and \ - event['event'] in ('mount', 'unmount'): - self._reset() - if self._wait_session is not None: - self._wait_session.push(event) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - -class _WaitSession(object): - - def __init__(self): - self._signal = coroutine.Event() - self._users = 0 - self._queue = collections.deque() - - def __enter__(self): - if self._users: - # Break previous waiting session, - # only one `wait()` should exist for one session - # TODO Support multiple clients - self._signal.set() - self._signal.clear() - self._users += 1 - return self - - def __exit__(self, exc_type, exc_value, traceback): - self._users -= 1 - - def push(self, event): - if self._users > 0: - self._queue.append(event) - self._signal.set() - self._signal.clear() - - def wait(self): - self._queue.clear() - self._signal.wait() - return bool(self._queue) - - def __iter__(self): - return iter(self._queue) diff --git a/sugar_network/client/objects.py b/sugar_network/client/objects.py deleted file mode 100644 index 4b64f2f..0000000 --- a/sugar_network/client/objects.py +++ /dev/null @@ -1,191 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import logging -from cStringIO import StringIO -from os.path import isdir, abspath - -from sugar_network.client.bus import Client -from sugar_network.toolkit import sugar -from active_toolkit import enforce - - -_logger = logging.getLogger('sugar_network.objects') - - -class Object(object): - - def __init__(self, mountpoint, document, reply, guid=None, props=None, - offset=None, **kwargs): - self.mountpoint = mountpoint - self.document = document - self._reply = reply or [] - self._guid = guid - self._props = props or {} - self._blobs = {} - self._dirty = set() - self.offset = offset - - for prop, value in kwargs.items(): - self[prop] = value - - @property - def guid(self): - return self._guid - - def get(self, prop): - if prop == 'guid': - return self._guid - result = self._props.get(prop) - if result is None: - enforce(prop in self._reply, - 'Access to not requested %r property in %r from %r mount', - prop, self.document, self.mountpoint) - self.fetch() - result = self._props.get(prop) - return result - - def fetch(self, props=None): - enforce(self._guid, 'Object needs to be posted first') - - to_fetch = [] - for prop in (props or self._reply): - if prop not in self._props: - to_fetch.append(prop) - if not to_fetch: - return - - response = Client.call('GET', mountpoint=self.mountpoint, - document=self.document, guid=self._guid, reply=to_fetch) - response.update(self._props) - self._props = response - - def post(self): - if not self._dirty: - return - - props = {} - for i in self._dirty: - props[i] = self._props.get(i) - - if self._guid: - Client.call('PUT', mountpoint=self.mountpoint, - document=self.document, guid=self._guid, content=props, - content_type='application/json') - else: - props['user'] = [sugar.uid()] - self._guid = Client.call('POST', mountpoint=self.mountpoint, - document=self.document, content=props, - content_type='application/json') - - self._dirty.clear() - return self._guid - - def get_blob_path(self, prop): - blob, is_path = self._get_blob(prop) - if is_path: - return blob['path'], blob['mime_type'] - else: - return None, None - - def get_blob(self, prop): - blob, is_path = self._get_blob(prop) - if is_path: - path = blob['path'] - if path is None: - return _empty_blob - enforce(not isdir(path), 'Requested BLOB is a dictionary') - return _Blob(path, blob['mime_type']) - elif blob is not None: - return _StringIO(blob.encode('utf8')) - else: - return _empty_blob - - def upload_blob(self, prop, path, pass_ownership=False): - enforce(self._guid, 'Object needs to be posted first') - Client.call('PUT', 'upload_blob', mountpoint=self.mountpoint, - document=self.document, guid=self._guid, prop=prop, - path=abspath(path), pass_ownership=pass_ownership) - - def _get_blob(self, prop): - blob = self._blobs.get(prop) - if blob is None: - blob = Client.call('GET', 'get_blob', mountpoint=self.mountpoint, - document=self.document, guid=self._guid, prop=prop) - self._blobs[prop] = blob - return blob, isinstance(blob, dict) and 'path' in blob - - def __getitem__(self, prop): - result = self.get(prop) - enforce(result is not None, KeyError, - 'Property %r is absent in %r from %r mount', - prop, self.document, self.mountpoint) - return result - - def __setitem__(self, prop, value): - enforce(prop != 'guid', 'Property "guid" is read-only') - if self._props.get(prop) == value: - return - self._props[prop] = value - self._dirty.add(prop) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.post() - - -class _Blob(file): - - def __init__(self, path, mime_type): - file.__init__(self, path) - self.mime_type = mime_type - - -class _EmptyBlob(object): - - closed = True - mime_type = 'application/octet-stream' - - def read(self, size=None): - return '' - - def close(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - -class _StringIO(object): - - def __init__(self, *args, **kwargs): - self._stream = StringIO(*args, **kwargs) - - def __getattr__(self, name): - return getattr(self._stream, name) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - -_empty_blob = _EmptyBlob() diff --git a/sugar_network/local/__init__.py b/sugar_network/local/__init__.py index 36ed8df..e02cc1a 100644 --- a/sugar_network/local/__init__.py +++ b/sugar_network/local/__init__.py @@ -74,6 +74,10 @@ anonymous = Option( 'only read-only operations are available in this mode', default=False, type_cast=Option.bool_cast, action='store_true') +ipc_port = Option( + 'port number to listen for incomming connections from IPC clients', + default=5001, type_cast=int) + def path(*args): """Calculate a path from the root. diff --git a/sugar_network/local/bus.py b/sugar_network/local/bus.py deleted file mode 100644 index 5d57840..0000000 --- a/sugar_network/local/bus.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import json -import errno -import socket -import logging -from os.path import exists - -import active_document as ad -from sugar_network import local -from sugar_network.toolkit import ipc, sugar -from active_toolkit import coroutine, sockets, util - - -_logger = logging.getLogger('local.bus') - - -class IPCServer(object): - - def __init__(self, mounts): - self._subscriptions = [] - self._mounts = mounts - self._publish_lock = coroutine.Lock() - self._servers = coroutine.Pool() - - self._mounts.connect(self._republish) - - self._servers.spawn( - _start_server('accept', self._serve_client)) - self._servers.spawn( - _start_server('subscribe', self._serve_subscription)) - - def serve_forever(self): - # Clients write to rendezvous named pipe, in block mode, - # to make sure that server is started - rendezvous = ipc.rendezvous(server=True) - try: - self._servers.join() - except KeyboardInterrupt: - pass - finally: - os.close(rendezvous) - - def stop(self): - while self._subscriptions: - self._subscriptions.pop().close() - self._servers.kill() - - def _serve_client(self, conn_file): - while True: - message = conn_file.read_message() - if message is None: - break - try: - request = ad.Request(message) - request.principal = sugar.uid() - request.access_level = ad.ACCESS_LOCAL - - content_type = request.pop('content_type') - if content_type == 'application/json': - request.content = json.loads(conn_file.read()) - elif content_type: - request.content_stream = conn_file - else: - request.content = conn_file.read() or None - - response = ad.Response() - result = self._mounts.call(request, response) - conn_file.write_message(result) - - except Exception, error: - conn_file.write_message({'error': str(error)}) - - def _serve_subscription(self, conn_file): - _logger.debug('Added new %r subscription', conn_file) - self._subscriptions.append(conn_file) - return True - - def _republish(self, event): - _logger.debug('Send notification: %r', event) - - with self._publish_lock: - for sock in self._subscriptions[:]: - try: - sock.write_message(event) - except socket.error, error: - if error.errno == errno.EPIPE: - _logger.debug('Lost %r subscription', sock) - self._subscriptions.remove(sock) - else: - util.exception(_logger, - 'Failed to deliver event via %r', sock) - - -def _start_server(name, serve_cb): - accept_path = local.ensure_path('run', name) - if exists(accept_path): - os.unlink(accept_path) - - # pylint: disable-msg=E1101 - accept = coroutine.socket(socket.AF_UNIX) - accept.bind(accept_path) - accept.listen(5) - - def connection_cb(conn, address): - conn_file = sockets.SocketFile(conn) - _logger.debug('New %r connection: %r', name, conn_file) - do_not_close = False - try: - do_not_close = serve_cb(conn_file) - finally: - _logger.debug('Quit %r connection: %r', name, conn_file) - if not do_not_close: - conn_file.close() - - return coroutine.Server(accept, connection_cb).serve_forever diff --git a/sugar_network/client/dbus_client.py b/sugar_network/local/dbus_client.py index a8682da..a8682da 100644 --- a/sugar_network/client/dbus_client.py +++ b/sugar_network/local/dbus_client.py diff --git a/sugar_network/local/ipc_client.py b/sugar_network/local/ipc_client.py new file mode 100644 index 0000000..20a13aa --- /dev/null +++ b/sugar_network/local/ipc_client.py @@ -0,0 +1,30 @@ +# Copyright (C) 2012 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import active_document as ad + +from sugar_network.toolkit import router +from sugar_network.local import sugar + + +class Router(router.Router): + + def authenticate(self, request): + return sugar.uid() + + def call(self, request, response): + request.access_level = ad.ACCESS_LOCAL + response.content_type = 'application/json' + return router.Router.call(self, request, response) diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py index 747d3fa..d0297b5 100644 --- a/sugar_network/local/mounts.py +++ b/sugar_network/local/mounts.py @@ -17,7 +17,6 @@ import os import json import shutil import logging -from urlparse import urlparse from os.path import isabs, exists, join, basename, isdir from gettext import gettext as _ @@ -27,7 +26,7 @@ from sweets_recipe import Bundle from sugar_network.toolkit import sugar, http from sugar_network.local import activities, cache from sugar_network import local, checkin, sugar -from active_toolkit import sockets, util, coroutine, enforce +from active_toolkit import util, coroutine, enforce _LOCAL_PROPS = { @@ -105,8 +104,7 @@ class LocalMount(ad.VolumeCommands, _Mount): ad.VolumeCommands.before_create(self, request, props) def _events_cb(self, event): - if 'mountpoint' not in event: - event['mountpoint'] = self.mountpoint + event['mountpoint'] = self.mountpoint self.publish(event) @@ -137,6 +135,7 @@ class HomeMount(LocalMount): util.exception(_logger, 'Failed to read %r spec file', path) continue + print '>', request.access_level == ad.ACCESS_LOCAL if request.access_level == ad.ACCESS_LOCAL: impl_id = spec.root else: @@ -295,6 +294,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): _Mount.__init__(self) _ProxyCommands.__init__(self, home_volume) + self._client = None self._seqno = 0 self._remote_volume_guid = None self._api_urls = [] @@ -308,24 +308,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): try: return ad.CommandsProcessor.call(self, request, response) except ad.CommandNotFound: - pass - - method = request.pop('method') - document = request.pop('document') - guid = request.pop('guid') if 'guid' in request else None - prop = request.pop('prop') if 'prop' in request else None - - path = [document] - if guid: - path.append(guid) - if prop: - path.append(prop) - - return http.request(method, path, data=request.content, - params=request, headers={ - 'Content-Type': 'application/json', - 'Accept-Language': ','.join(request.accept_language), - }) + return self._client.call(request) return self._proxy_call(request, response, super_call) @@ -340,7 +323,7 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): def get_blob(self, document, guid, prop): def download(path, seqno): - return http.download([document, guid, prop], path, seqno, + return self._client.download([document, guid, prop], path, seqno, document == 'implementation' and prop == 'data') return cache.get_blob(document, guid, prop, self._seqno, @@ -352,7 +335,8 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): try: with file(path, 'rb') as f: - http.request('PUT', [document, guid, prop], files={'file': f}) + self._client.request('PUT', [document, guid, prop], + files={'file': f}) finally: if pass_ownership and exists(path): os.unlink(path) @@ -365,49 +349,30 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): self._connections.spawn(self._connect) def _connect(self): - - def connect(url): - _logger.debug('Connecting to %r master', url) - local.api_url.value = url - subscription = http.request('POST', [''], - params={'cmd': 'subscribe'}, - headers={'Content-Type': 'application/json'}) - conn = sockets.SocketFile(coroutine.socket()) - conn.connect((urlparse(url).hostname, subscription['port'])) - conn.write_message({'ticket': subscription['ticket']}) - return conn - - def listen_events(url, conn): - stat = http.request('GET', [], params={'cmd': 'stat'}, - headers={'Content-Type': 'application/json'}) - # pylint: disable-msg=E1103 - self._seqno = stat.get('seqno') or 0 - self._remote_volume_guid = stat.get('guid') - - _logger.info('Connected to %r master', url) - _Mount.set_mounted(self, True) - - while True: - coroutine.select([conn.fileno()], [], []) - event = conn.read_message() - if event is None: - break - - seqno = event.get('seqno') - if seqno: - self._seqno = seqno - - event['mountpoint'] = self.mountpoint - self.publish(event) - for url in self._api_urls: try: - conn = connect(url) + _logger.debug('Connecting to %r master', url) + self._client = http.Client(url) + subscription = self._client.subscribe() except Exception: util.exception(_logger, 'Cannot connect to %r master', url) continue + try: - listen_events(url, conn) + stat = self._client.get(cmd='stat') + # pylint: disable-msg=E1103 + self._seqno = stat.get('seqno') or 0 + self._remote_volume_guid = stat.get('guid') + + _logger.info('Connected to %r master', url) + _Mount.set_mounted(self, True) + + for event in subscription: + seqno = event.get('seqno') + if seqno: + self._seqno = seqno + event['mountpoint'] = self.mountpoint + self.publish(event) except Exception: util.exception(_logger, 'Failed to dispatch remote event') finally: diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py index 175bf02..8fd63a1 100644 --- a/sugar_network/local/mountset.py +++ b/sugar_network/local/mountset.py @@ -21,12 +21,11 @@ import active_document as ad from sugar_network import local, node from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor +from sugar_network.toolkit.router import Router from sugar_network.local.mounts import LocalMount, NodeMount -from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.node.commands import NodeCommands from sugar_network.node.sync_node import SyncCommands -from sugar_network.node.router import Router -from sugar_network.resources.volume import Volume +from sugar_network.resources.volume import Volume, Commands from active_toolkit import util, coroutine, enforce @@ -35,13 +34,9 @@ _DB_DIRNAME = '.sugar-network' _logger = logging.getLogger('local.mountset') -class Mountset(dict, ad.CommandsProcessor, SyncCommands): +class Mountset(dict, ad.CommandsProcessor, Commands, SyncCommands): def __init__(self, home_volume): - dict.__init__(self) - ad.CommandsProcessor.__init__(self) - SyncCommands.__init__(self, local.path('sync')) - self.opened = coroutine.Event() self.home_volume = home_volume self._subscriptions = {} @@ -49,6 +44,11 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands): self._jobs = coroutine.Pool() self._servers = coroutine.Pool() + dict.__init__(self) + ad.CommandsProcessor.__init__(self) + SyncCommands.__init__(self, local.path('sync')) + Commands.__init__(self) + def __getitem__(self, mountpoint): enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint) return self.get(mountpoint) @@ -133,22 +133,18 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands): return list(requires) - @ad.volume_command(method='POST', cmd='publish') - def republish(self, request): - self.publish(request.content) - def call(self, request, response=None): if response is None: response = ad.Response() request.accept_language = [self._lang] - mountpoint = request.get('mountpoint') + mountpoint = request.get('mountpoint') or '/' try: try: result = ad.CommandsProcessor.call(self, request, response) except ad.CommandNotFound: - enforce('mountpoint' in request, 'No \'mountpoint\' argument') - request.pop('mountpoint') + if 'mountpoint' in request: + request.pop('mountpoint') mount = self[mountpoint] if mountpoint == '/': mount.set_mounted(True) @@ -254,16 +250,11 @@ class Mountset(dict, ad.CommandsProcessor, SyncCommands): self._jobs.spawn(volume.populate) if server_mode: - subscriber = SubscribeSocket(volume, - node.host.value, node.subscribe_port.value) - cp = NodeCommands(volume, subscriber) - _logger.info('Start %r server on %s port', volume.root, node.port.value) server = coroutine.WSGIServer(('0.0.0.0', node.port.value), - Router(cp)) + Router(NodeCommands(volume))) self._servers.spawn(server.serve_forever) - self._servers.spawn(subscriber.serve_forever) # Let servers start before publishing mount event coroutine.dispatch() diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index 94499d2..e73e2d3 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -25,10 +25,6 @@ port = Option( 'port number to listen incomming connections', default=8000, type_cast=int, name='port') -subscribe_port = Option( - 'port number to listen incomming subscribtion requests', - default=8001, type_cast=int, name='subscribe_port') - keyfile = Option( 'path to SSL certificate keyfile to serve requests via HTTPS', name='keyfile') @@ -47,11 +43,6 @@ data_root = Option( 'path to a directory to place server data', default='/var/lib/sugar-network', name='data_root') -only_commit_events = Option( - 'subscribers can be notified only with "commit" events; ' - 'that is useful to minimize interactions between server and clients', - default=False, type_cast=Option.bool_cast, action='store_true') - find_limit = Option( 'limit the resulting list for search requests', default=32, type_cast=int) @@ -70,21 +61,3 @@ pull_timeout = Option( 'delay in seconds to return to sync-pull requester to wait until ' 'pull request will be ready', default=30, type_cast=int) - - -class HTTPStatus(Exception): - - status = None - headers = None - result = None - - -class BadRequest(HTTPStatus): - - status = '400 Bad Request' - - -class Unauthorized(HTTPStatus): - - status = '401 Unauthorized' - headers = {'WWW-Authenticate': 'Sugar'} diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index 74adad2..e8f2e03 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -21,6 +21,8 @@ from os.path import exists, join import active_document as ad from sugar_network import node from sugar_network.node.sync_master import SyncCommands +from sugar_network.resources.volume import Commands +from sugar_network.toolkit import router from active_toolkit import util, enforce @@ -29,11 +31,11 @@ _DEFAULT_MASTER_GUID = 'api-testing.network.sugarlabs.org' _logger = logging.getLogger('node.commands') -class NodeCommands(ad.VolumeCommands): +class NodeCommands(ad.VolumeCommands, Commands): - def __init__(self, volume, subscriber=None): + def __init__(self, volume): ad.VolumeCommands.__init__(self, volume) - self._subscriber = subscriber + Commands.__init__(self) self._is_master = False node_path = join(volume.root, 'node') @@ -62,6 +64,9 @@ class NodeCommands(ad.VolumeCommands): if isinstance(prop, ad.BlobProperty): self._blobs[document].add(prop.name) + def connect(self, callback, condition=None, **kwargs): + self.volume.connect(callback, condition) + @ad.volume_command(method='GET') def hello(self, response): response.content_type = 'text/html' @@ -74,11 +79,6 @@ class NodeCommands(ad.VolumeCommands): 'seqno': self.volume.seqno.value, } - @ad.volume_command(method='POST', cmd='subscribe') - def subscribe(self): - enforce(self._subscriber is not None, 'Subscription is disabled') - return self._subscriber.new_ticket() - @ad.document_command(method='DELETE', permissions=ad.ACCESS_AUTH | ad.ACCESS_AUTHOR) def delete(self, document, guid): @@ -130,10 +130,11 @@ class NodeCommands(ad.VolumeCommands): blobs = reply & self._blobs[document] reply = list(reply - blobs) - if not reply: - reply = ['guid', 'layer'] - else: - reply.append('layer') + if reply: + if 'layer' not in reply: + reply.append('layer') + if 'guid' not in reply: + reply.append('guid') result = ad.VolumeCommands.get(self, document, guid, request, reply) enforce('deleted' not in result['layer'], ad.NotFound, @@ -150,7 +151,7 @@ class NodeCommands(ad.VolumeCommands): return if cmd.permissions & ad.ACCESS_AUTH: - enforce(request.principal is not None, node.Unauthorized, + enforce(request.principal is not None, router.Unauthorized, 'User is not authenticated') if cmd.permissions & ad.ACCESS_AUTHOR and 'guid' in request: @@ -163,7 +164,6 @@ class NodeCommands(ad.VolumeCommands): def before_create(self, request, props): if request['document'] == 'user': props['guid'], props['pubkey'] = _load_pubkey(props['pubkey']) - props['user'] = [props['guid']] else: props['user'] = [request.principal] self._set_author(props) @@ -212,8 +212,8 @@ class NodeCommands(ad.VolumeCommands): class MasterCommands(NodeCommands, SyncCommands): - def __init__(self, volume, subscriber=None): - NodeCommands.__init__(self, volume, subscriber) + def __init__(self, volume): + NodeCommands.__init__(self, volume) SyncCommands.__init__(self) diff --git a/sugar_network/node/subscribe_socket.py b/sugar_network/node/subscribe_socket.py deleted file mode 100644 index 2a32aa0..0000000 --- a/sugar_network/node/subscribe_socket.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import socket -import logging - -from sugar_network import node -from active_toolkit import sockets, coroutine, util, enforce - - -_logger = logging.getLogger('node.subscribe_socket') - - -class SubscribeSocket(object): - - def __init__(self, volume, host, port): - self._host = host - self._port = port - self._server = None - self._tickets = set() - self._subscribers = set() - - volume.connect(self.__signal_cb) - - def new_ticket(self): - ticket = os.urandom(16).encode('hex') - self._tickets.add(ticket) - return {'host': self._host, 'port': self._port, 'ticket': ticket} - - def serve_forever(self): - _logger.info('Listening for subscriptions on %s port', self._port) - - conn = coroutine.socket(socket.AF_INET, socket.SOCK_STREAM) - # pylint: disable-msg=E1101 - conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - conn.bind((self._host, self._port)) - conn.listen(5) - - self._server = coroutine.Server(conn, self._serve_client) - try: - self._server.serve_forever() - finally: - self._server.stop() - self._server = None - - def stop(self): - if self._server is not None: - self._server.stop() - - def _serve_client(self, conn, host): - _logger.debug('Got request from %r, making a handshake', host) - - try: - handshake = sockets.SocketFile(conn).read_message() - ticket = handshake.get('ticket') - enforce(ticket and ticket in self._tickets, 'Unknown request') - self._tickets.remove(ticket) - except Exception, error: - _logger.warning('Handshake failed, discard the request: %s', error) - return - - _logger.debug('Accepted %r subscriber', host) - self._subscribers.add(conn) - try: - data = conn.recv(sockets.BUFFER_SIZE) - enforce(not data, 'Subscriber misused connection ' - 'by sending %s bytes, discard it', len(data)) - except Exception: - util.exception('Failed to handle subscription from %r', host) - finally: - _logger.debug('Close subscription from %r', host) - self._subscribers.remove(conn) - - def __signal_cb(self, event): - if node.only_commit_events.value: - if event['event'] != 'commit': - return - else: - if event['event'] == 'commit': - # Subscribers already got update notifications enough - return - - for conn in self._subscribers: - sockets.SocketFile(conn).write_message(event) diff --git a/sugar_network/resources/__init__.py b/sugar_network/resources/__init__.py index 99eacc8..fd955e1 100644 --- a/sugar_network/resources/__init__.py +++ b/sugar_network/resources/__init__.py @@ -13,7 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -CONTEXT_TYPES = ['activity', 'project'] +CONTEXT_TYPES = ['activity', 'project', 'package'] COMMENT_PARENTS = ['feedback', 'solution'] NOTIFICATION_TYPES = ['create', 'update', 'delete', 'vote'] FEEDBACK_TYPES = ['question', 'idea', 'problem', 'review'] diff --git a/sugar_network/resources/context.py b/sugar_network/resources/context.py index 85c004b..5b7931f 100644 --- a/sugar_network/resources/context.py +++ b/sugar_network/resources/context.py @@ -103,3 +103,17 @@ class Context(Resource): @ad.active_property(ad.StoredProperty, typecast=[int], default=(-1, -1)) def position(self, value): return value + + @classmethod + @ad.directory_command(method='PUT', cmd='include', + arguments={'layers': ad.to_list}) + def include(cls, directory, layers, request): + import logging + logging.error('include> %r %r', layers, request.content) + + @classmethod + @ad.directory_command(method='PUT', cmd='exclude', + arguments={'layers': ad.to_list}) + def exclude(cls, directory, layers, request): + import logging + logging.error('exclude> %r %r', layers, request.content) diff --git a/sugar_network/resources/user.py b/sugar_network/resources/user.py index a120832..717888c 100644 --- a/sugar_network/resources/user.py +++ b/sugar_network/resources/user.py @@ -15,10 +15,15 @@ import active_document as ad from sugar_network.node import stats +from active_toolkit import enforce class User(ad.Document): + @ad.active_property(prefix='L', typecast=[], default=['public']) + def layer(self, value): + return value + @ad.active_property(slot=1, prefix='N', full_text=True) def name(self, value): return value @@ -43,7 +48,7 @@ class User(ad.Document): def tags(self, value): return value - @ad.active_property(slot=5, prefix='L', full_text=True, default='') + @ad.active_property(slot=5, prefix='P', full_text=True, default='') def location(self, value): return value @@ -51,9 +56,11 @@ class User(ad.Document): def birthday(self, value): return value - @ad.document_command(method='GET', cmd='stats-info', - permissions=ad.ACCESS_AUTHOR) - def _stats_info(self): + @ad.document_command(method='GET', cmd='stats-info') + def _stats_info(self, request): + enforce(request.principal == self['guid'], ad.Forbidden, + 'Operation is permitted only for authors') + status = {} rrd = stats.get_rrd(self.guid) for name, __, last_update in rrd.dbs: @@ -65,9 +72,11 @@ class User(ad.Document): 'status': status, } - @ad.document_command(method='POST', cmd='stats-upload', - permissions=ad.ACCESS_AUTHOR) + @ad.document_command(method='POST', cmd='stats-upload') def _stats_upload(self, request): + enforce(request.principal == self['guid'], ad.Forbidden, + 'Operation is permitted only for authors') + name = request.content['name'] values = request.content['values'] rrd = stats.get_rrd(self.guid) diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py index 7344f5a..c6ac59b 100644 --- a/sugar_network/resources/volume.py +++ b/sugar_network/resources/volume.py @@ -13,6 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import json import logging from os.path import join @@ -29,8 +30,12 @@ _logger = logging.getLogger('resources.volume') class Resource(ad.Document): - @ad.active_property(prefix='RL', full_text=True, typecast=[], - default=['public']) + @ad.active_property(prefix='RU', typecast=[], default=[], + permissions=ad.ACCESS_CREATE | ad.ACCESS_READ) + def user(self, value): + return value + + @ad.active_property(prefix='RL', typecast=[], default=['public']) def layer(self, value): return value @@ -78,7 +83,7 @@ class Volume(ad.SingleVolume): return self[record['document']].merge(increment_seqno=increment_seqno, **record) - def diff(self, in_seq, out_packet, clone=False): + def diff(self, in_seq, out_packet): # Since `in_seq` will be changed in `patch()`, original sequence # should be passed as-is to every document's `diff()` because # seqno handling is common for all documents @@ -90,8 +95,7 @@ class Volume(ad.SingleVolume): directory.commit() def patch(): - for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK, - clone=clone): + for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK): coroutine.dispatch() seqno = None @@ -128,3 +132,45 @@ class Volume(ad.SingleVolume): # this place, `push_seq` should contain not-collapsed sequence orig_seq.floor(push_seq.last) out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq) + + +class Commands(object): + + def __init__(self): + self._notifier = coroutine.AsyncResult() + self.connect(lambda event: self._notify(event)) + + def connect(self, callback, condition=None, **kwargs): + raise NotImplementedError() + + @ad.volume_command(method='GET', cmd='subscribe') + def subscribe(self, response, only_commits=False): + """Subscribe to Server-Sent Events. + + :param only_commits: + subscribers can be notified only with "commit" events; + that is useful to minimize interactions between server and clients + + """ + response.content_type = 'text/event-stream' + response['Cache-Control'] = 'no-cache' + return self._pull_events(only_commits) + + def _pull_events(self, only_commits): + while True: + event = self._notifier.get() + + if only_commits: + if event['event'] != 'commit': + continue + else: + if event['event'] == 'commit': + # Subscribers already got update notifications enough + continue + + yield 'data: %s\n\n' % json.dumps(event) + + def _notify(self, event): + self._notifier.set(event) + self._notifier = coroutine.AsyncResult() + coroutine.dispatch() diff --git a/sugar_network/toolkit/dbus_thread.py b/sugar_network/toolkit/dbus_thread.py index b811a80..1cf2380 100644 --- a/sugar_network/toolkit/dbus_thread.py +++ b/sugar_network/toolkit/dbus_thread.py @@ -31,7 +31,7 @@ _services = [] _call_queue = coroutine.AsyncQueue() -def start(mountset): +def start(commands_processor): gobject.threads_init() threads_init() @@ -44,11 +44,11 @@ def start(mountset): for service in _services: spawn(service.handle_event, event) - mountset.connect(handle_event) + commands_processor.connect(handle_event) try: for request, reply_cb, error_cb, args in _call_queue: try: - reply = mountset.call(request) + reply = commands_processor.call(request) except Exception, error: util.exception(_logger, 'DBus %r request failed', request) if error_cb is not None: @@ -59,7 +59,7 @@ def start(mountset): args += [reply] spawn(reply_cb, *args) finally: - mountset.disconnect(handle_event) + commands_processor.disconnect(handle_event) spawn(mainloop.quit) thread.join() diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 3efee83..627103d 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -28,6 +28,7 @@ import requests from requests.sessions import Session from M2Crypto import DSA +import active_document as ad from sweets_recipe import Bundle from active_toolkit.sockets import decode_multipart, BUFFER_SIZE from sugar_network.toolkit import sugar @@ -40,101 +41,22 @@ from gevent.monkey import patch_socket patch_socket(dns=False) -_logger = logging.getLogger('toolkit.http') -_session = None +_logger = logging.getLogger('http') -def reset(): - global _session - _session = None +class Client(object): + def __init__(self, api_url, **kwargs): + self._api_url = api_url + self._params = kwargs -def download(url_path, out_path, seqno=None, extract=False): - if isdir(out_path): - shutil.rmtree(out_path) - elif not exists(dirname(out_path)): - os.makedirs(dirname(out_path)) - - params = {} - if seqno: - params['seqno'] = seqno - - response = _request('GET', url_path, allow_redirects=True, - params=params, allowed_response=[404]) - if response.status_code != 200: - return 'application/octet-stream' - - mime_type = response.headers.get('Content-Type') or \ - 'application/octet-stream' - - content_length = response.headers.get('Content-Length') - content_length = int(content_length) if content_length else 0 - if seqno and not content_length: - # Local cacheed versions is not stale - return mime_type - - def fetch(f): - _logger.debug('Download %r BLOB to %r', '/'.join(url_path), out_path) - chunk_size = min(content_length, BUFFER_SIZE) - empty = True - for chunk in response.iter_content(chunk_size=chunk_size): - empty = False - f.write(chunk) - return not empty - - def fetch_multipart(stream, size, boundary): - stream.readline = None - for filename, content in decode_multipart(stream, size, boundary): - dst_path = join(out_path, filename) - if not exists(dirname(dst_path)): - os.makedirs(dirname(dst_path)) - shutil.move(content.name, dst_path) - - content_type, params = cgi.parse_header(mime_type) - if content_type.split('/', 1)[0] == 'multipart': - try: - fetch_multipart(response.raw, content_length, params['boundary']) - except Exception: - shutil.rmtree(out_path, ignore_errors=True) - raise - elif extract: - tmp_file = tempfile.NamedTemporaryFile(delete=False) - try: - if fetch(tmp_file): - tmp_file.close() - with Bundle(tmp_file.name, 'application/zip') as bundle: - bundle.extractall(out_path) - finally: - if exists(tmp_file.name): - os.unlink(tmp_file.name) - else: - with file(out_path, 'w') as f: - if not fetch(f): - os.unlink(out_path) - - return mime_type - - -def request(method, path, data=None, headers=None, **kwargs): - response = _request(method, path, data, headers, **kwargs) - if response.headers.get('Content-Type') == 'application/json': - return json.loads(response.content) - else: - return response - - -def _request(method, path, data=None, headers=None, allowed_response=None, - **kwargs): - global _session - - if _session is None: verify = True if local.no_check_certificate.value: verify = False elif local.certfile.value: verify = local.certfile.value - headers = None + headers = {'Accept-Language': ','.join(ad.default_lang())} if not local.anonymous.value: uid = sugar.uid() key_path = sugar.profile_path('owner.key') @@ -143,61 +65,190 @@ def _request(method, path, data=None, headers=None, allowed_response=None, 'sugar_user_signature': _sign(key_path, uid), } - _session = Session(headers=headers, verify=verify, prefetch=False) - - if not path: - path = [''] - if not isinstance(path, basestring): - path = '/'.join([i.strip('/') for i in [local.api_url.value] + path]) - - if data is not None and headers and \ - headers.get('Content-Type') == 'application/json': - data = json.dumps(data) - - while True: - try: - response = requests.request(method, path, data=data, - headers=headers, session=_session, **kwargs) - except requests.exceptions.SSLError: - _logger.warning('Pass --no-check-certificate to avoid SSL checks') - raise - + self._session = Session(headers=headers, verify=verify, prefetch=False) + + def get(self, path_=None, **kwargs): + kwargs.update(self._params) + return self.request('GET', path_, params=kwargs) + + def post(self, path_=None, data_=None, **kwargs): + kwargs.update(self._params) + return self.request('POST', path_, data_, + headers={'Content-Type': 'application/json'}, params=kwargs) + + def put(self, path_=None, data_=None, **kwargs): + kwargs.update(self._params) + return self.request('PUT', path_, data_, + headers={'Content-Type': 'application/json'}, params=kwargs) + + def delete(self, path_=None, **kwargs): + kwargs.update(self._params) + return self.request('DELETE', path_, params=kwargs) + + def request(self, method, path=None, data=None, headers=None, **kwargs): + response = self._request(method, path, data, headers, **kwargs) + if response.headers.get('Content-Type') == 'application/json': + return json.loads(response.content) + else: + return response + + def call(self, request): + method = request.pop('method') + document = request.pop('document') + guid = request.pop('guid') if 'guid' in request else None + prop = request.pop('prop') if 'prop' in request else None + + path = [document] + if guid: + path.append(guid) + if prop: + path.append(prop) + + return self.request(method, path, data=request.content, params=request, + headers={'Content-Type': 'application/json'}) + + def download(self, url_path, out_path, seqno=None, extract=False): + if isdir(out_path): + shutil.rmtree(out_path) + elif not exists(dirname(out_path)): + os.makedirs(dirname(out_path)) + + params = {} + if seqno: + params['seqno'] = seqno + + response = self._request('GET', url_path, allow_redirects=True, + params=params, allowed=[404]) if response.status_code != 200: - if response.status_code == 401: - enforce(not local.anonymous.value, - 'Operation is not available in anonymous mode') - _logger.info('User is not registered on the server, ' - 'registering') - _register() - continue - if allowed_response and response.status_code in allowed_response: - return response - content = response.content + return 'application/octet-stream' + + mime_type = response.headers.get('Content-Type') or \ + 'application/octet-stream' + + content_length = response.headers.get('Content-Length') + content_length = int(content_length) if content_length else 0 + if seqno and not content_length: + # Local cacheed versions is not stale + return mime_type + + def fetch(f): + _logger.debug('Download %r BLOB to %r', + '/'.join(url_path), out_path) + chunk_size = min(content_length, BUFFER_SIZE) + empty = True + for chunk in response.iter_content(chunk_size=chunk_size): + empty = False + f.write(chunk) + return not empty + + def fetch_multipart(stream, size, boundary): + stream.readline = None + for filename, content in decode_multipart(stream, size, boundary): + dst_path = join(out_path, filename) + if not exists(dirname(dst_path)): + os.makedirs(dirname(dst_path)) + shutil.move(content.name, dst_path) + + content_type, params = cgi.parse_header(mime_type) + if content_type.split('/', 1)[0] == 'multipart': try: - error = json.loads(content) + fetch_multipart(response.raw, content_length, + params['boundary']) except Exception: - _logger.debug('Got %s HTTP error for %r request:\n%s', - response.status_code, path, content) - response.raise_for_status() - else: - raise RuntimeError(error['error']) + shutil.rmtree(out_path, ignore_errors=True) + raise + elif extract: + tmp_file = tempfile.NamedTemporaryFile(delete=False) + try: + if fetch(tmp_file): + tmp_file.close() + with Bundle(tmp_file.name, 'application/zip') as bundle: + bundle.extractall(out_path) + finally: + if exists(tmp_file.name): + os.unlink(tmp_file.name) + else: + with file(out_path, 'w') as f: + if not fetch(f): + os.unlink(out_path) - return response + return mime_type + def subscribe(self): + response = self.request('GET', params={'cmd': 'subscribe'}) + for line in _readlines(response.raw): + if line.startswith('data: '): + yield json.loads(line.split(' ', 1)[1]) -def _register(): - _request('POST', ['user'], - headers={'Content-Type': 'application/json'}, - data={ - 'name': sugar.nickname() or '', - 'color': sugar.color() or '#000000,#000000', - 'machine_sn': sugar.machine_sn() or '', - 'machine_uuid': sugar.machine_uuid() or '', - 'pubkey': sugar.pubkey(), - }, - ) + def _request(self, method, path, data=None, headers=None, allowed=None, + **kwargs): + if not path: + path = [''] + if not isinstance(path, basestring): + path = '/'.join([i.strip('/') for i in [self._api_url] + path]) + + if data is not None and headers and \ + headers.get('Content-Type') == 'application/json': + data = json.dumps(data) + + while True: + try: + response = requests.request(method, path, data=data, + headers=headers, session=self._session, **kwargs) + except requests.exceptions.SSLError: + _logger.warning('Use --no-check-certificate to avoid checks') + raise + + if response.status_code != 200: + if response.status_code == 401: + enforce(not local.anonymous.value, + 'Operation is not available in anonymous mode') + _logger.info('User is not registered on the server, ' + 'registering') + self._register() + continue + if allowed and response.status_code in allowed: + return response + content = response.content + try: + error = json.loads(content) + except Exception: + _logger.debug('Got %s HTTP error for %r request:\n%s', + response.status_code, path, content) + response.raise_for_status() + else: + raise RuntimeError(error['error']) + + return response + + def _register(self): + self._request('POST', ['user'], + headers={ + 'Content-Type': 'application/json', + }, + data={ + 'name': sugar.nickname() or '', + 'color': sugar.color() or '#000000,#000000', + 'machine_sn': sugar.machine_sn() or '', + 'machine_uuid': sugar.machine_uuid() or '', + 'pubkey': sugar.pubkey(), + }, + ) def _sign(key_path, data): key = DSA.load_key(key_path) return key.sign_asn1(hashlib.sha1(data).digest()).encode('hex') + + +def _readlines(stream): + line = '' + while True: + char = stream.read(1) + if not char: + break + if char == '\n': + yield line + line = '' + else: + line += char diff --git a/sugar_network/toolkit/ipc.py b/sugar_network/toolkit/ipc.py deleted file mode 100644 index 3e9471e..0000000 --- a/sugar_network/toolkit/ipc.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import errno -import logging - -from sugar_network import local - - -_logger = logging.getLogger('toolkit.ipc') - - -def rendezvous(server=False): - """Rendezvous barrier to synchronize one server and multiple clients. - - :param server: - if caller is a server - :returns: - if `server` is `True`, file descriptor that should be closed - on server shutting down - - """ - rendezvous_path = local.ensure_path('run', 'rendezvous') - - try: - os.mkfifo(rendezvous_path) - except OSError, error: - if error.errno != errno.EEXIST: - raise - - if server: - _logger.info('Start accepting clients on %r', rendezvous_path) - return os.open(rendezvous_path, os.O_RDONLY | os.O_NONBLOCK) - else: - _logger.debug('Connecting to %r IPC server', rendezvous_path) - # Will be blocked until server will call `rendezvous(server=True)` - fd = os.open(rendezvous_path, os.O_WRONLY) - _logger.info('Connected successfully') - # No need in fd any more - os.close(fd) - - -def server_exists(server=False): - """The same as `rendezvous()` but for client side and without waiting.""" - rendezvous_path = local.ensure_path('run', 'rendezvous') - - try: - os.mkfifo(rendezvous_path) - except OSError, error: - if error.errno != errno.EEXIST: - raise - - try: - fd = os.open(rendezvous_path, os.O_WRONLY | os.O_NONBLOCK) - except OSError, error: - if error.errno == errno.ENXIO: - _logger.debug('No server launched') - return False - - _logger.info('Connected successfully') - # No need in fd any more - os.close(fd) - return True diff --git a/sugar_network/node/router.py b/sugar_network/toolkit/router.py index ee1fa1c..d02dd28 100644 --- a/sugar_network/node/router.py +++ b/sugar_network/toolkit/router.py @@ -23,12 +23,30 @@ from bisect import bisect_left from os.path import join, isfile import active_document as ad -from sugar_network import node, static +from sugar_network import static from active_toolkit.sockets import BUFFER_SIZE from active_toolkit import util, enforce -_logger = logging.getLogger('node.router') +_logger = logging.getLogger('router') + + +class HTTPStatus(Exception): + + status = None + headers = None + result = None + + +class BadRequest(HTTPStatus): + + status = '400 Bad Request' + + +class Unauthorized(HTTPStatus): + + status = '401 Unauthorized' + headers = {'WWW-Authenticate': 'Sugar'} class Router(object): @@ -41,6 +59,61 @@ class Router(object): # Otherwise ssh-keygen will popup auth dialogs on registeration del os.environ['SSH_ASKPASS'] + def authenticate(self, request): + user = request.environ.get('HTTP_SUGAR_USER') + if user is None: + return None + + if user not in self._authenticated and \ + (request.path != ['user'] or request['method'] != 'POST'): + _logger.debug('Logging %r user', user) + request = ad.Request(method='GET', cmd='exists', + document='user', guid=user) + enforce(self._cp.call(request, ad.Response()), Unauthorized, + 'Principal user does not exist') + self._authenticated.add(user) + + return user + + def call(self, request, response): + if 'HTTP_ORIGIN' in request.environ: + enforce(request.environ['HTTP_ORIGIN'] == 'null', ad.Forbidden, + 'Cross-site is allowed only for local applications') + response['Access-Control-Allow-Origin'] = \ + request.environ['HTTP_ORIGIN'] + + if request['method'] == 'OPTIONS': + # TODO Process OPTIONS request per url? + if request.environ['HTTP_ORIGIN']: + response['Access-Control-Allow-Methods'] = \ + request.environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD'] + response['Access-Control-Allow-Headers'] = \ + request.environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS'] + else: + response['Allow'] = 'GET, POST, PUT, DELETE' + response.content_length = 0 + return None + + request.principal = self.authenticate(request) + if request.path[:1] == ['static']: + static_path = join(static.PATH, *request.path[1:]) + enforce(isfile(static_path), 'No such file') + result = file(static_path) + else: + result = self._cp.call(request, response) + + if hasattr(result, 'read'): + # pylint: disable-msg=E1103 + if hasattr(result, 'fileno'): + response.content_length = os.fstat(result.fileno()).st_size + elif hasattr(result, 'seek'): + result.seek(0, 2) + response.content_length = result.tell() + result.seek(0) + result = _stream_reader(result) + + return result + def __call__(self, environ, start_response): request = _Request(environ) response = _Response() @@ -51,7 +124,7 @@ class Router(object): result = None try: - result = self._call(request, response) + result = self.call(request, response) except ad.Redirect, error: response.status = '303 See Other' response['Location'] = error.location @@ -64,7 +137,7 @@ class Router(object): response.status = '404 Not Found' elif isinstance(error, ad.Forbidden): response.status = '403 Forbidden' - elif isinstance(error, node.HTTPStatus): + elif isinstance(error, HTTPStatus): response.status = error.status response.update(error.headers or {}) result = error.result @@ -101,61 +174,6 @@ class Router(object): elif result is not None: yield result - def _call(self, request, response): - if 'HTTP_ORIGIN' in request.environ: - enforce(request.environ['HTTP_ORIGIN'] == 'null', ad.Forbidden, - 'Cross-site is allowed only for local applications') - response['Access-Control-Allow-Origin'] = \ - request.environ['HTTP_ORIGIN'] - - if request['method'] == "OPTIONS": - # TODO Process OPTIONS request per url? - if request.environ['HTTP_ORIGIN']: - response['Access-Control-Allow-Methods'] = \ - request.environ['HTTP_ACCESS_CONTROL_REQUEST_METHOD'] - response['Access-Control-Allow-Headers'] = \ - request.environ['HTTP_ACCESS_CONTROL_REQUEST_HEADERS'] - else: - response['Allow'] = "GET, POST, PUT, DELETE" - response.content_length = 0 - return None - - request.principal = self._authenticate(request) - if request.path[:1] == ['static']: - static_path = join(static.PATH, *request.path[1:]) - enforce(isfile(static_path), 'No such file') - result = file(static_path) - else: - result = self._cp.call(request, response) - - if hasattr(result, 'read'): - # pylint: disable-msg=E1103 - if hasattr(result, 'fileno'): - response.content_length = os.fstat(result.fileno()).st_size - elif hasattr(result, 'seek'): - result.seek(0, 2) - response.content_length = result.tell() - result.seek(0) - result = _stream_reader(result) - - return result - - def _authenticate(self, request): - user = request.environ.get('HTTP_SUGAR_USER') - if user is None: - return None - - if user not in self._authenticated and \ - (request.path != ['user'] or request['method'] != 'POST'): - _logger.debug('Logging %r user', user) - request = ad.Request(method='GET', cmd='exists', - document='user', guid=user) - enforce(self._cp.call(request, ad.Response()), node.Unauthorized, - 'Principal user does not exist') - self._authenticated.add(user) - - return user - class _Request(ad.Request): @@ -202,7 +220,7 @@ class _Request(ad.Request): self.content_stream = files.list[0].file scope = len(self.path) - enforce(scope >= 0 and scope < 4, node.BadRequest, + enforce(scope >= 0 and scope < 4, BadRequest, 'Incorrect requested path') if scope == 3: self['document'], self['guid'], self['prop'] = self.path diff --git a/sugar_network/zerosugar/feeds.py b/sugar_network/zerosugar/feeds.py index 26a4eb8..a1542b6 100644 --- a/sugar_network/zerosugar/feeds.py +++ b/sugar_network/zerosugar/feeds.py @@ -34,8 +34,9 @@ def read(context): client = None for client in config.clients: try: - with client.Context(context).get_blob('feed') as f: - enforce(not f.closed, 'No feed for %r context', context) + blob = client.get(['context', context, 'feed'], cmd='get_blob') + enforce(blob and 'path' in blob, 'No feed for %r context', context) + with file(blob['path']) as f: feed_content = json.load(f) if feed_content: break diff --git a/sugar_network/zerosugar/injector.py b/sugar_network/zerosugar/injector.py index af80321..2189ab8 100644 --- a/sugar_network/zerosugar/injector.py +++ b/sugar_network/zerosugar/injector.py @@ -30,7 +30,7 @@ from zeroinstall.injector.requirements import Requirements from sweets_recipe import Spec from sugar_network.zerosugar import solver from sugar_network.zerosugar.config import config -from sugar_network import local, Client +from sugar_network import local from sugar_network.toolkit import sugar from active_toolkit import coroutine, util, enforce @@ -129,20 +129,20 @@ def _fork(callback, mountpoint, context, *args): os.close(fd_w) return Pipe(pid, fd_r) + from sugar_network import IPCClient + os.close(fd_r) global _pipe _pipe = fd_w - Client.close() - def thread_func(): log_path = _setup_logging(context) _progress('stat', log_path=log_path, mountpoint=mountpoint, context=context) - config.clients = [Client('~')] + config.clients = [IPCClient(mountpoint='~')] if mountpoint != '~': - config.clients.append(Client(mountpoint)) + config.clients.append(IPCClient(mountpoint=mountpoint)) try: callback(mountpoint, context, *args) @@ -150,7 +150,7 @@ def _fork(callback, mountpoint, context, *args): util.exception(_logger) _progress('failure', error=str(error)) - # To avoid execution current thread coroutine + # Avoid a mess with current thread coroutines thread = threading.Thread(target=thread_func) thread.start() thread.join() @@ -219,9 +219,10 @@ def _make(context, command): # TODO Per download progress _progress('download', progress=-1) - impl = sel.client.Implementation(sel.id) - impl_path, __ = impl.get_blob_path('data') - enforce(impl_path, 'Cannot download implementation') + impl = sel.client.get(['implementation', sel.id, 'data'], + cmd='get_blob') + enforce(impl and 'path' in impl, 'Cannot download implementation') + impl_path = impl['path'] dl = sel.download_sources[0] if dl.extract is not None: diff --git a/tests/__init__.py b/tests/__init__.py index 2c455c0..18a2750 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -17,17 +17,15 @@ from M2Crypto import DSA import active_document as ad from active_toolkit import coroutine -from sugar_network.client import bus from sugar_network.toolkit import sugar, http, sneakernet, mounts_monitor -from sugar_network.local.bus import IPCServer +from sugar_network.toolkit.router import Router +from sugar_network.local.ipc_client import Router as IPCRouter from sugar_network.local.mounts import HomeMount, RemoteMount from sugar_network.local.mountset import Mountset from sugar_network import local, node from sugar_network.resources.user import User from sugar_network.resources.context import Context -from sugar_network.node.router import Router from sugar_network.node.commands import NodeCommands -from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.node import stats from sugar_network.resources.volume import Volume @@ -74,7 +72,6 @@ class Test(unittest.TestCase): ad.index_flush_threshold.value = 1 node.find_limit.value = 1024 node.tmpdir.value = tmpdir + '/tmp' - node.only_commit_events.value = False node.data_root.value = tmpdir node.sync_dirs.value = [] ad.index_write_queue.value = 10 @@ -83,6 +80,7 @@ class Test(unittest.TestCase): local.api_url.value = 'http://localhost:8800' local.server_mode.value = False local.mounts_root.value = None + local.ipc_port.value = 5101 mounts_monitor.stop() mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1 stats.stats_root.value = tmpdir + '/stats' @@ -107,11 +105,6 @@ class Test(unittest.TestCase): self._logfile = file(self.logfile + '.out', 'a') sys.stdout = sys.stderr = self._logfile - bus._CONNECTION_POOL = 1 - bus.Client.close() - - http.reset() - for handler in logging.getLogger().handlers: logging.getLogger().removeHandler(handler) logging.basicConfig(level=logging.DEBUG, filename=self.logfile) @@ -132,7 +125,6 @@ class Test(unittest.TestCase): def stop_servers(self): if self.mounts is not None: self.mounts.close() - bus.Client.close() if self.server is not None: self.server.stop() while self.forks: @@ -195,7 +187,7 @@ class Test(unittest.TestCase): pid = os.fork() if pid: self.forks.append(pid) - coroutine.sleep(1) + coroutine.sleep(2) return pid self.fork_num += 1 @@ -231,7 +223,8 @@ class Test(unittest.TestCase): self.mounts['~'] = HomeMount(volume) if root: self.mounts['/'] = RemoteMount(volume) - self.server = IPCServer(self.mounts) + self.server = coroutine.WSGIServer( + ('localhost', local.ipc_port.value), IPCRouter(self.mounts)) coroutine.spawn(self.server.serve_forever) self.mounts.open() self.mounts.opened.wait() @@ -263,17 +256,14 @@ class Test(unittest.TestCase): ad.index_write_queue.value = 10 volume = Volume('remote', classes or [User, Context]) - subscriber = SubscribeSocket(volume, 'localhost', 8801) - cp = NodeCommands(volume, subscriber) + cp = NodeCommands(volume) httpd = coroutine.WSGIServer(('localhost', 8800), Router(cp)) try: coroutine.joinall([ coroutine.spawn(httpd.serve_forever), - coroutine.spawn(subscriber.serve_forever), ]) finally: httpd.stop() - subscriber.stop() volume.close() diff --git a/tests/units/__main__.py b/tests/units/__main__.py index 23c08f8..0eac03b 100644 --- a/tests/units/__main__.py +++ b/tests/units/__main__.py @@ -3,26 +3,23 @@ from __init__ import tests from collection import * -from sneakernet import * -from bus import * -from activities import * -from ipc_client import * -from injector import * -from subscribe_socket import * -from router import * from volume import * from local import * from node import * -from mountset import * -from home_mount import * -from remote_mount import * -from node_mount import * -from sync_master import * -from sync_node import * +from dbus_client import * from datastore import * from dbus_datastore import * +from sneakernet import * +from router import * from files_sync import * -from dbus_client import * +from sync_node import * +from sync_master import * from mounts_monitor import * +from activities import * +from home_mount import * +from remote_mount import * +from node_mount import * +from injector import * +from mountset import * tests.main() diff --git a/tests/units/bus.py b/tests/units/bus.py deleted file mode 100755 index c38a69a..0000000 --- a/tests/units/bus.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import time -import json -from os.path import join - -from __init__ import tests - -import active_document as ad -from active_toolkit import coroutine -from sugar_network.client import bus -from sugar_network import Client, ServerError -from sugar_network.local.bus import IPCServer -from sugar_network.local.mountset import Mountset -from sugar_network.resources.volume import Volume - - -class BusTest(tests.Test): - - def test_Rendezvous(self): - - def server(): - time.sleep(1) - volume = Volume('local', []) - mounts = Mountset(volume) - server = IPCServer(mounts) - mounts.call = lambda *args: None - server.serve_forever() - - ts = time.time() - self.fork(server) - - client = Client('/') - client.Context.delete('guid') - assert time.time() - ts >= 1 - - def test_Exception(self): - self.start_server() - - def call_handle(request, response): - raise RuntimeError() - - self.mounts.call = call_handle - - client = Client('/') - self.assertRaises(ServerError, client.Resource.delete, 'fake') - - def test_ConsecutiveRequests(self): - self.start_server() - calls = [] - self.mounts.call = lambda request, response: calls.append(dict(request)) - - bus._CONNECTION_POOL = 3 - - def caller(client, i, n): - getattr(client, 'Resource%s' % i).delete('wait%s%s' % (i, n)) - - ts = time.time() - clients = [Client('/'), Client('/'), Client('/')] - call_jobs = [] - for i, client in enumerate(clients): - for n in range(9): - call_jobs.append(coroutine.spawn(caller, client, i, n)) - - coroutine.joinall(call_jobs) - assert time.time() - ts < 4 - - standard = [] - for i, client in enumerate(clients): - for n in range(9): - standard.append({'method': 'DELETE', 'mountpoint': '/', 'guid': 'wait%s%s' % (i, n), 'document': 'resource%s' % i}) - self.assertEqual( - sorted(standard), - sorted(calls)) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/dbus_client.py b/tests/units/dbus_client.py index 04b2307..3f5c1ef 100755 --- a/tests/units/dbus_client.py +++ b/tests/units/dbus_client.py @@ -5,7 +5,7 @@ import os import sys from os.path import abspath -arg0 = abspath(__file__) +arg0 = abspath(__file__).replace('.pyc', '.py') import dbus import gobject @@ -33,7 +33,7 @@ class DbusClientTest(tests.Test): return self.fork(os.execvp, arg0, [arg0, self.id().split('.')[-1], 'fork']) - coroutine.sleep(3) + coroutine.sleep(1) def test_Call(self): client = DBusClient(mountpoint='~') diff --git a/tests/units/dbus_datastore.py b/tests/units/dbus_datastore.py index f95059a..72398ad 100755 --- a/tests/units/dbus_datastore.py +++ b/tests/units/dbus_datastore.py @@ -6,7 +6,7 @@ import sys import time from os.path import exists, abspath, dirname -arg0 = abspath(__file__) +arg0 = abspath(__file__).replace('.pyc', '.py') import dbus import gobject @@ -88,7 +88,7 @@ class DbusDatastoreTest(tests.Test): 'activity_id': 'activity_id', 'filesize': '0', 'creation_time': '1', - 'timestamp': '1', + 'timestamp': '-1', 'mtime': '1970-01-01T00:00:01', 'tags': 'tags', }, diff --git a/tests/units/home_mount.py b/tests/units/home_mount.py index c9c9119..c0cfaae 100755 --- a/tests/units/home_mount.py +++ b/tests/units/home_mount.py @@ -3,29 +3,30 @@ import os import socket -from os.path import exists +from os.path import exists, abspath from __init__ import tests from active_toolkit import sockets, coroutine -from sugar_network import Client from sugar_network.resources.report import Report +from sugar_network import IPCClient class HomeMountTest(tests.Test): def test_create(self): self.start_server() - local = Client('~') - - guid = local.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + local = IPCClient(mountpoint='~') + + guid = local.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) self.assertNotEqual(None, guid) - res = local.Context(guid, ['title', 'keep', 'keep_impl', 'position']) + res = local.get(['context', guid], reply=['guid', 'title', 'keep', 'keep_impl', 'position']) self.assertEqual(guid, res['guid']) self.assertEqual('title', res['title']) self.assertEqual(False, res['keep']) @@ -34,253 +35,141 @@ class HomeMountTest(tests.Test): def test_update(self): self.start_server() - local = Client('~') - - guid = local.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - context = local.Context(guid) - context['title'] = 'title_2' - context['keep'] = True - context['position'] = (2, 3) - context.post() - - context = local.Context(guid, ['title', 'keep', 'position']) + local = IPCClient(mountpoint='~') + + guid = local.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + + local.put(['context', guid], { + 'title': 'title_2', + 'keep': True, + 'position': (2, 3), + }) + + context = local.get(['context', guid], reply=['title', 'keep', 'position']) self.assertEqual('title_2', context['title']) self.assertEqual(True, context['keep']) self.assertEqual([2, 3], context['position']) - def test_get(self): - self.start_server() - local = Client('~') - - guid = local.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - context = local.Context(guid, ['title', 'keep', 'keep_impl', 'position']) - self.assertEqual(guid, context['guid']) - self.assertEqual('title', context['title']) - self.assertEqual(False, context['keep']) - self.assertEqual(0, context['keep_impl']) - self.assertEqual([-1, -1], context['position']) - def test_find(self): self.start_server() - local = Client('~') - - guid_1 = local.Context( - type='activity', - title='title_1', - summary='summary', - description='description').post() - guid_2 = local.Context( - type='activity', - title='title_2', - summary='summary', - description='description').post() - guid_3 = local.Context( - type='activity', - title='title_3', - summary='summary', - description='description').post() - - cursor = local.Context.cursor(reply=['guid', 'title', 'keep', 'keep_impl', 'position']) - self.assertEqual(3, cursor.total) + local = IPCClient(mountpoint='~') + + guid_1 = local.post(['context'], { + 'type': 'activity', + 'title': 'title_1', + 'summary': 'summary', + 'description': 'description', + }) + guid_2 = local.post(['context'], { + 'type': 'activity', + 'title': 'title_2', + 'summary': 'summary', + 'description': 'description', + }) + guid_3 = local.post(['context'], { + 'type': 'activity', + 'title': 'title_3', + 'summary': 'summary', + 'description': 'description', + }) + + cursor = local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl', 'position']) + self.assertEqual(3, cursor['total']) self.assertEqual( sorted([ (guid_1, 'title_1', False, 0, [-1, -1]), (guid_2, 'title_2', False, 0, [-1, -1]), (guid_3, 'title_3', False, 0, [-1, -1]), ]), - sorted([(i['guid'], i['title'], i['keep'], i['keep_impl'], i['position']) for i in cursor])) + sorted([(i['guid'], i['title'], i['keep'], i['keep_impl'], i['position']) for i in cursor['result']])) def test_upload_blob(self): self.start_server() - local = Client('~') + local = IPCClient(mountpoint='~') - guid = local.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = local.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) self.touch(('file', 'blob')) - local.Context(guid).upload_blob('preview', 'file') - self.assertEqual('blob', local.Context(guid).get_blob('preview').read()) + local.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = local.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob', file(blob['path']).read()) self.touch(('file2', 'blob2')) - local.Context(guid).upload_blob('preview', 'file2', pass_ownership=True) - self.assertEqual('blob2', local.Context(guid).get_blob('preview').read()) + local.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True) + blob = local.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob2', file(blob['path']).read()) assert not exists('file2') def test_GetAbsetnBLOB(self): self.start_server([Report]) - client = Client('~') + local = IPCClient(mountpoint='~') - guid = client.Report( - context='context', - implementation='implementation', - description='description').post() + guid = local.post(['report'], { + 'context': 'context', + 'implementation': 'implementation', + 'description': 'description', + }) - path, mime_type = client.Report(guid).get_blob_path('data') - self.assertEqual(None, path) - self.assertEqual(True, client.Report(guid).get_blob('data').closed) + self.assertEqual(None, local.get(['report', guid, 'data'], cmd='get_blob')) def test_GetDefaultBLOB(self): self.start_server() - client = Client('~') + local = IPCClient(mountpoint='~') - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = local.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - path, mime_type = client.Context(guid).get_blob_path('icon') - assert path.endswith('missing.png') - assert exists(path) - self.assertEqual(False, client.Context(guid).get_blob('icon').closed) + blob = local.get(['context', guid, 'icon'], cmd='get_blob') + assert blob['path'].endswith('missing.png') + assert exists(blob['path']) def test_Subscription(self): self.start_server() - client = Client('~') - - subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX)) - subscription.connect('run/subscribe') - coroutine.sleep() - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('props') - self.assertEqual( - {'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1}, - event) - self.assertEqual( - {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 1}, - subscription.read_message()) - - client.Context(guid, title='new-title').post() - - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('props') - self.assertEqual( - {'mountpoint': '~', 'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 2}, - event) - self.assertEqual( - {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 2}, - subscription.read_message()) - - client.Context.delete(guid) - - coroutine.select([subscription.fileno()], [], []) - self.assertEqual( - {'mountpoint': '~', 'document': 'context', 'event': 'delete', 'guid': guid}, - subscription.read_message()) - self.assertEqual( - {'mountpoint': '~', 'document': 'context', 'event': 'commit', 'seqno': 2}, - subscription.read_message()) - - def test_Subscription_NotifyOnline(self): - self.start_ipc_and_restful_server() - - local = Client('~') - remote = Client('/') - - guid = remote.Context( - type='activity', - title={'en': 'title'}, - summary={'en': 'summary'}, - description={'en': 'description'}, - keep=True).post() - - subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX)) - subscription.connect('run/subscribe') - coroutine.sleep(1) - - local.Context(guid, keep=False).post() - coroutine.sleep(1) - - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('props') - self.assertEqual( - {'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 4}, - event) - - def test_Connect(self): - self.start_server() - client = Client('~') - - self.assertEqual(True, client.connected) - - def test_Localize(self): - os.environ['LANG'] = 'en_US' - self.start_server() - client = Client('~') - - guid = client.Context( - type='activity', - title='title_en', - summary='summary_en', - description='description_en').post() - - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) - - self.stop_servers() - os.environ['LANG'] = 'ru_RU' - self.start_server() - client = Client('~') - - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) - - res['title'] = 'title_ru' - res['summary'] = 'summary_ru' - res['description'] = 'description_ru' - res.post() - - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_ru', res['title']) - self.assertEqual('summary_ru', res['summary']) - self.assertEqual('description_ru', res['description']) - - self.stop_servers() - os.environ['LANG'] = 'es_ES' - self.start_server() - client = Client('~') - - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) - - self.stop_servers() - os.environ['LANG'] = 'ru_RU' - self.start_server() - client = Client('~') - - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_ru', res['title']) - self.assertEqual('summary_ru', res['summary']) - self.assertEqual('description_ru', res['description']) + local = IPCClient(mountpoint='~') + events = [] + + def read_events(): + for event in local.subscribe(): + if 'props' in event: + event.pop('props') + events.append(event) + job = coroutine.spawn(read_events) + + guid = local.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + coroutine.dispatch() + local.put(['context', guid], { + 'title': 'title_2', + }) + coroutine.dispatch() + local.delete(['context', guid]) + coroutine.sleep(.5) + job.kill() + + self.assertEqual([ + {'guid': guid, 'seqno': 1, 'document': 'context', 'event': 'create'}, + {'guid': guid, 'seqno': 2, 'document': 'context', 'event': 'update', 'mountpoint': '~'}, + {'guid': guid, 'event': 'delete', 'document': 'context', 'mountpoint': '~'}, + ], + events) if __name__ == '__main__': diff --git a/tests/units/injector.py b/tests/units/injector.py index e178e87..a69a2cf 100755 --- a/tests/units/injector.py +++ b/tests/units/injector.py @@ -11,24 +11,26 @@ from os.path import exists from __init__ import tests from active_toolkit import coroutine -from sugar_network import checkin, launch, Client +from sugar_network import checkin, launch from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.resources.implementation import Implementation from sugar_network.local import activities +from sugar_network import IPCClient class InjectorTest(tests.Test): def test_checkin_Online(self): self.start_ipc_and_restful_server([User, Context, Implementation]) - client = Client('/') + remote = IPCClient(mountpoint='/') - context = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + context = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) blob_path = 'remote/context/%s/%s/feed' % (context[:2], context) self.touch( @@ -48,13 +50,14 @@ class InjectorTest(tests.Test): ], [i for i in pipe]) - impl = client.Implementation( - context=context, - license=['GPLv3+'], - version='1', - date=0, - stability='stable', - notes='').post() + impl = remote.post(['implementation'], { + 'context': context, + 'license': 'GPLv3+', + 'version': '1', + 'date': 0, + 'stability': 'stable', + 'notes': '', + }) blob_path = 'remote/context/%s/%s/feed' % (context[:2], context) self.touch( @@ -108,20 +111,22 @@ class InjectorTest(tests.Test): def test_launch_Online(self): self.start_ipc_and_restful_server([User, Context, Implementation]) - client = Client('/') + remote = IPCClient(mountpoint='/') - context = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - impl = client.Implementation( - context=context, - license=['GPLv3+'], - version='1', - date=0, - stability='stable', - notes='').post() + context = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + impl = remote.post(['implementation'], { + 'context': context, + 'license': 'GPLv3+', + 'version': '1', + 'date': 0, + 'stability': 'stable', + 'notes': '', + }) blob_path = 'remote/context/%s/%s/feed' % (context[:2], context) self.touch( @@ -172,13 +177,14 @@ class InjectorTest(tests.Test): ], [i for i in pipe]) - impl_2 = client.Implementation( - context=context, - license=['GPLv3+'], - version='1', - date=0, - stability='stable', - notes='').post() + impl_2 = remote.post(['implementation'], { + 'context': context, + 'license': 'GPLv3+', + 'version': '1', + 'date': 0, + 'stability': 'stable', + 'notes': '', + }) os.unlink('cache/context/%s/%s/feed.meta' % (context[:2], context)) blob_path = 'remote/context/%s/%s/feed' % (context[:2], context) @@ -257,12 +263,13 @@ class InjectorTest(tests.Test): ])) self.start_server() - client = Client('~') + client = IPCClient(mountpoint='~') monitor = coroutine.spawn(activities.monitor, self.mounts.home_volume['context'], ['Activities']) coroutine.sleep() + blob = client.get(['context', 'bundle_id', 'feed'], cmd='get_blob') self.assertEqual( json.dumps({ '1': { @@ -288,7 +295,7 @@ class InjectorTest(tests.Test): }, }, }), - client.Context('bundle_id').get_blob('feed').read()) + blob) if __name__ == '__main__': diff --git a/tests/units/ipc_client.py b/tests/units/ipc_client.py deleted file mode 100755 index b86ac18..0000000 --- a/tests/units/ipc_client.py +++ /dev/null @@ -1,347 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -from os.path import isdir - -from __init__ import tests - -import active_document as ad - -from active_toolkit import coroutine -from sugar_network.resources.user import User -from sugar_network.resources.context import Context -from sugar_network.local.mounts import HomeMount -from sugar_network.local.mountset import Mountset -from sugar_network import Client -from sugar_network.resources.volume import Volume - - -class IPCClientTest(tests.Test): - - def test_RealtimeUpdates(self): - self.start_server() - - client_1 = Client('~') - client_2 = Client('~') - - events_1 = [] - events_2 = [] - - def waiter(cursor, events): - for i in cursor.read_events(): - events.append(i) - - cursor_1 = client_1.Context.cursor(reply=['guid', 'title']) - self.assertEqual(0, cursor_1.total) - coroutine.spawn(waiter, cursor_1, events_1) - - cursor_2 = client_2.Context.cursor(reply=['guid', 'title']) - self.assertEqual(0, cursor_2.total) - coroutine.spawn(waiter, cursor_2, events_2) - - guid_1 = client_1.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - coroutine.sleep(.1) - self.assertEqual([None], events_1) - self.assertEqual([None], events_2) - self.assertEqual( - sorted([ - (guid_1, 'title'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - (guid_1, 'title'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - client_1.Context(guid_1, title='title-2').post() - coroutine.sleep(.1) - self.assertEqual([None], events_1[1:]) - self.assertEqual([None], events_2[1:]) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - guid_2 = client_2.Context( - type='activity', - title='title-3', - summary='summary', - description='description').post() - coroutine.sleep(.1) - self.assertEqual([None], events_1[2:]) - self.assertEqual([None], events_2[2:]) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - (guid_2, 'title-3'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - (guid_2, 'title-3'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - client_2.Context(guid_2, title='title-4').post() - coroutine.sleep(.1) - self.assertEqual([None], events_1[3:]) - self.assertEqual([None], events_2[3:]) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - (guid_2, 'title-4'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - (guid_1, 'title-2'), - (guid_2, 'title-4'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - client_1.Context.delete(guid_1) - coroutine.sleep(.1) - self.assertEqual([None], events_1[4:]) - self.assertEqual([None], events_2[4:]) - self.assertEqual( - sorted([ - (guid_2, 'title-4'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - (guid_2, 'title-4'), - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - client_2.Context.delete(guid_2) - coroutine.sleep(.1) - self.assertEqual([None], events_1[5:]) - self.assertEqual([None], events_2[5:]) - self.assertEqual( - sorted([ - ]), - sorted([(i['guid'], i['title']) for i in cursor_1])) - self.assertEqual( - sorted([ - ]), - sorted([(i['guid'], i['title']) for i in cursor_2])) - - def test_ReplaceReadEventsCalls(self): - self.start_server() - - client = Client('~') - cursor = client.Context.cursor(reply=['guid', 'title']) - - def waiter(cursor, events): - for i in cursor.read_events(): - events.append(i) - - events_1 = [] - coroutine.spawn(waiter, cursor, events_1) - coroutine.sleep() - - events_2 = [] - coroutine.spawn(waiter, cursor, events_2) - coroutine.sleep() - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - coroutine.sleep() - - self.assertEqual([], events_1) - self.assertEqual([None], events_2) - - events_3 = [] - coroutine.spawn(waiter, cursor, events_3) - coroutine.sleep() - - client.Context(guid, title='title-1').post() - coroutine.sleep() - - self.assertEqual([], events_1) - self.assertEqual([None], events_2) - self.assertEqual([None], events_3) - - def test_Cursor_Gets(self): - self.start_server() - - client = Client('~') - - guid_1 = client.Context( - type='activity', - title='title-1', - summary='summary', - description='description').post() - guid_2 = client.Context( - type='activity', - title='title-2', - summary='summary', - description='description').post() - guid_3 = client.Context( - type='activity', - title='title-3', - summary='summary', - description='description').post() - - cursor = client.Context.cursor(reply=['guid', 'title']) - self.assertEqual('title-1', cursor[0]['title']) - self.assertEqual('title-2', cursor[1]['title']) - self.assertEqual('title-3', cursor[2]['title']) - self.assertEqual('title-1', cursor[guid_1]['title']) - self.assertEqual('title-2', cursor[guid_2]['title']) - self.assertEqual('title-3', cursor[guid_3]['title']) - - cursor = client.Context.cursor('FOO', reply=['guid', 'title']) - self.assertEqual(0, cursor.total) - self.assertEqual('title-1', cursor[guid_1]['title']) - self.assertEqual('title-2', cursor[guid_2]['title']) - self.assertEqual('title-3', cursor[guid_3]['title']) - - def test_ConnectEventsInCursor(self): - - def remote_server(): - coroutine.sleep(1) - self.restful_server() - - pid = self.fork(self.restful_server) - - self.start_server() - client = Client('/') - - events = [] - cursor = client.Context.cursor(reply=['guid', 'title']) - - def waiter(): - for i in cursor.read_events(): - events.append(i) - - coroutine.spawn(waiter) - coroutine.sleep(.1) - - self.assertEqual([], events) - - coroutine.sleep(1) - - self.assertEqual([None], events) - - self.waitpid(pid) - coroutine.sleep(1) - - self.assertEqual([None, None], events) - - def test_PublishEvents(self): - self.start_server() - - events = [] - Client.connect(lambda event: events.append(event)) - - Client.publish('probe', payload=1) - Client.publish('probe', payload=2) - Client.publish('probe', payload=3) - coroutine.sleep() - - self.assertEqual([ - {'payload': 1, 'event': 'probe'}, - {'payload': 2, 'event': 'probe'}, - {'payload': 3, 'event': 'probe'}, - ], - events) - - def test_GetBLOBs(self): - - class Mounts(object): - - def call(self_, request, response): - if not (request['cmd'] == 'get_blob' and \ - request['document'] == 'document' and \ - 'guid' in request): - return - if request['prop'] == 'directory': - os.makedirs('directory') - return {'path': tests.tmpdir + '/directory', 'mime_type': 'fake'} - elif request['prop'] == 'file': - self.touch(('file', 'file')) - return {'path': tests.tmpdir + '/file', 'mime_type': 'fake'} - elif request['prop'] == 'value': - return 'value' - - def close(self): - pass - - self.start_server([]) - self.server._mounts = Mounts() - client = Client('~') - - blob = client.Document('guid').get_blob_path('directory') - self.assertEqual(tests.tmpdir + '/directory', blob[0]) - self.assertEqual('fake', blob[1]) - assert isdir('directory') - - blob = client.Document('guid').get_blob('file') - self.assertEqual(tests.tmpdir + '/file', blob.name) - self.assertEqual('fake', blob.mime_type) - self.assertEqual('file', blob.read()) - - blob = client.Document('guid').get_blob('value') - self.assertEqual('value', blob.read()) - - def test_Direct(self): - volume = Volume('local', [User, Context]) - Client._connection = Mountset(volume) - Client._connection['~'] = HomeMount(volume) - Client._connection.open() - client = Client('~') - - guid_1 = client.Context( - type='activity', - title='title-1', - summary='summary', - description='description').post() - guid_2 = client.Context( - type='activity', - title='title-2', - summary='summary', - description='description').post() - - self.assertEqual( - 'title-1', - client.Context(guid_1, reply=['title'])['title']) - - self.assertEqual( - sorted([ - (guid_1, 'title-1'), - (guid_2, 'title-2'), - ]), - sorted([(i.guid, i['title']) \ - for i in client.Context.cursor(reply=['title'])])) - - self.touch(('file', 'blob')) - client.Context(guid_2).upload_blob('preview', 'file') - self.assertEqual( - 'blob', - client.Context(guid_2).get_blob('preview').read()) - - client.Context.delete(guid_1) - client.Context.delete(guid_2) - self.assertEqual(0, client.Context.cursor().total) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/mountset.py b/tests/units/mountset.py index 9188509..c26435c 100755 --- a/tests/units/mountset.py +++ b/tests/units/mountset.py @@ -11,21 +11,27 @@ from __init__ import tests import active_document as ad from active_toolkit import coroutine, sockets from sugar_network.local.mountset import Mountset -from sugar_network.local.bus import IPCServer from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.toolkit import http, mounts_monitor -from sugar_network import local, Client, ServerError, sugar, node +from sugar_network import local, sugar, node from sugar_network.resources.volume import Volume from sugar_network.local.mounts import HomeMount, RemoteMount +from sugar_network.local.ipc_client import Router as IPCRouter +from sugar_network import IPCClient, Client class MountsetTest(tests.Test): def setUp(self): tests.Test.setUp(self) + self.events_job = None + self.events = [] + self.mounted = coroutine.Event() def tearDown(self): + if self.events_job is not None: + self.events_job.kill() tests.Test.tearDown(self) def mountset(self): @@ -33,21 +39,23 @@ class MountsetTest(tests.Test): volume = Volume('local', [User, Context]) mounts = Mountset(volume) - Client._connection = mounts - self.mounted = coroutine.Event() + self.server = coroutine.WSGIServer( + ('localhost', local.ipc_port.value), IPCRouter(mounts)) + coroutine.spawn(self.server.serve_forever) + mounts.open() + mounts.opened.wait() - def events_cb(event): - if event['event'] in ('mount', 'unmount'): - self.mount_events.append((event['event'], event['mountpoint'])) + def read_events(): + for event in IPCClient().subscribe(): + if 'props' in event: + event.pop('props') + self.events.append(event) self.mounted.set() - self.mount_events = [] - Client.connect(events_cb) - - mounts.open() - mounts.opened.wait() + coroutine.dispatch() + self.events_job = coroutine.spawn(read_events) + coroutine.sleep(.5) mounts_monitor.start(tests.tmpdir) - # Let `open()` start processing spawned jobs coroutine.dispatch() return mounts @@ -56,183 +64,118 @@ class MountsetTest(tests.Test): os.makedirs('1/.sugar-network') os.makedirs('2/.sugar-network') - self.mountset() + mounts = self.mountset() + mounts[tests.tmpdir + '/1'].mounted.wait() + mounts[tests.tmpdir + '/2'].mounted.wait() - self.mounted.wait() - self.mounted.clear() self.assertEqual([ - ('mount', tests.tmpdir + '/1'), - ('mount', tests.tmpdir + '/2'), + {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, + {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'}, ], - self.mount_events) - - Client(tests.tmpdir + '/1').Context( - type='activity', - title='remote', - summary='summary', - description='description').post() - Client(tests.tmpdir + '/2').Context( - type='activity', - title='remote', - summary='summary', - description='description').post() + self.events) self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True}, {'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True}, ]), - sorted(Client.mounts())) + sorted(IPCClient().get(cmd='mounts'))) def test_Mount(self): - self.mountset() + mounts = self.mountset() os.makedirs('tmp/1/.sugar-network') shutil.move('tmp/1', '.') - self.mounted.wait() self.mounted.clear() - self.assertEqual( - [('mount', tests.tmpdir + '/1')], - self.mount_events) + + self.assertEqual([ + {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, + ], + self.events) self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True}, ]), - sorted(Client.mounts())) - Client(tests.tmpdir + '/1').Context( - type='activity', - title='remote', - summary='summary', - description='description').post() + sorted(IPCClient().get(cmd='mounts'))) os.makedirs('tmp/2/.sugar-network') shutil.move('tmp/2', '.') - self.mounted.wait() self.mounted.clear() - self.assertEqual( - [('mount', tests.tmpdir + '/2')], - self.mount_events[1:]) + + self.assertEqual([ + {'mountpoint': tests.tmpdir + '/1', 'event': 'mount', 'private': True, 'name': '1'}, + {'mountpoint': tests.tmpdir + '/2', 'event': 'mount', 'private': True, 'name': '2'}, + ], + self.events) self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True}, {'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True}, ]), - sorted(Client.mounts())) - Client(tests.tmpdir + '/2').Context( - type='activity', - title='remote', - summary='summary', - description='description').post() + sorted(IPCClient().get(cmd='mounts'))) def test_Unmount(self): os.makedirs('1/.sugar-network') os.makedirs('2/.sugar-network') - self.mountset() - self.mounted.wait() - self.mounted.clear() + mounts = self.mountset() + client = IPCClient() + self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/1', 'name': '1', 'private': True}, {'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True}, ]), - sorted(Client.mounts())) + sorted(client.get(cmd='mounts'))) - del self.mount_events[:] + self.mounted.clear() + del self.events[:] shutil.rmtree('1') self.mounted.wait() self.mounted.clear() - self.assertEqual( - [('unmount', tests.tmpdir + '/1')], - self.mount_events) + + self.assertEqual([ + {'mountpoint': tests.tmpdir + '/1', 'event': 'unmount', 'private': True, 'name': '1'}, + ], + self.events) self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/2', 'name': '2', 'private': True}, ]), - sorted(Client.mounts())) + sorted(client.get(cmd='mounts'))) def test_MountNode(self): local.server_mode.value = True - self.mountset() + mounts = self.mountset() self.touch('tmp/mnt/.sugar-network') self.touch(('tmp/mnt/node', 'node')) shutil.move('tmp/mnt', '.') - self.mounted.wait() self.mounted.clear() - client = Client(tests.tmpdir + '/mnt') + self.assertEqual([ + {'mountpoint': tests.tmpdir + '/mnt', 'event': 'mount', 'private': False, 'name': 'mnt'}, + ], + self.events) self.assertEqual( sorted([ {'mountpoint': tests.tmpdir + '/mnt', 'name': 'mnt', 'private': False}, ]), - sorted(Client.mounts())) - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - local.api_url.value = 'http://localhost:%s' % node.port.value - self.assertEqual( - {'guid': guid, 'title': {'en-US': 'title'}}, - http.request('GET', ['context', guid], params={'reply': 'guid,title'})) - - def test_RootRemount(self): - mountset = self.mountset() - client = Client('/') - - mountset['/'] = RemoteMount(mountset.home_volume) - self.assertEqual(['/'], [i['mountpoint'] for i in mountset.mounts()]) - assert not mountset['/'].mounted.is_set() - self.assertEqual(0, client.Context.cursor().total) - self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post) - - pid = self.fork(self.restful_server) - coroutine.sleep(1) - - self.assertEqual([], self.mount_events) - self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post) - self.mounted.wait() - self.mounted.clear() - assert mountset['/'].mounted.is_set() - self.assertEqual([('mount', '/')], self.mount_events) - del self.mount_events[:] - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description', - ).post() + sorted(IPCClient().get(cmd='mounts'))) + + client = Client('http://localhost:%s' % node.port.value) + guid = client.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) self.assertEqual( - [guid], - [i['guid'] for i in client.Context.cursor()]) - - self.waitpid(pid) - - self.mounted.wait() - self.mounted.clear() - assert not mountset['/'].mounted.is_set() - self.assertEqual([('unmount', '/')], self.mount_events) - del self.mount_events[:] - self.assertEqual(0, client.Context.cursor().total) - self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post) - - pid = self.fork(self.restful_server) - coroutine.sleep(1) - - self.assertEqual([], self.mount_events) - self.assertRaises(RuntimeError, client.Context(type='activity', title='', summary='', description='').post) - self.mounted.wait() - self.mounted.clear() - assert mountset['/'].mounted.is_set() - self.assertEqual([('mount', '/')], self.mount_events) - del self.mount_events[:] + 'title', + client.get(['context', guid, 'title'])) if __name__ == '__main__': diff --git a/tests/units/node.py b/tests/units/node.py index 4ddae32..66e92e5 100755 --- a/tests/units/node.py +++ b/tests/units/node.py @@ -8,7 +8,8 @@ from __init__ import tests import active_document as ad from sugar_network import node -from sugar_network.node import stats, Unauthorized +from sugar_network.toolkit.router import Unauthorized +from sugar_network.node import stats from sugar_network.node.commands import NodeCommands from sugar_network.resources.volume import Volume from sugar_network.resources.user import User @@ -102,9 +103,10 @@ class NodeTest(tests.Test): assert exists(guid_path) self.assertEqual({ 'guid': guid, - 'title': {'en': 'title'}, + 'title': 'title', + 'layer': ['public'], }, - call(cp, method='GET', document='context', guid=guid, reply=['guid', 'title'])) + call(cp, method='GET', document='context', guid=guid, reply=['guid', 'title', 'layer'])) self.assertEqual(['public'], volume['context'].get(guid)['layer']) call(cp, method='DELETE', document='context', guid=guid, principal='principal') @@ -296,30 +298,30 @@ class NodeTest(tests.Test): volume['context'].set_blob(guid5, 'icon', url={'file1': {'order': 1, 'url': '/1'}, 'file2': {'order': 2, 'url': 'http://2'}}) self.assertEqual( - {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png'}, - call(cp, method='GET', document='context', guid=guid1, reply=['guid', 'icon'])) + {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png', 'layer': ['public']}, + call(cp, method='GET', document='context', guid=guid1, reply=['guid', 'icon', 'layer'])) self.assertEqual( - {'guid': guid2, 'icon': 'http://foo/bar'}, - call(cp, method='GET', document='context', guid=guid2, reply=['guid', 'icon'])) + {'guid': guid2, 'icon': 'http://foo/bar', 'layer': ['public']}, + call(cp, method='GET', document='context', guid=guid2, reply=['guid', 'icon', 'layer'])) self.assertEqual( - {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar'}, - call(cp, method='GET', document='context', guid=guid3, reply=['guid', 'icon'])) + {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar', 'layer': ['public']}, + call(cp, method='GET', document='context', guid=guid3, reply=['guid', 'icon', 'layer'])) self.assertEqual( - {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4}, - call(cp, method='GET', document='report', guid=guid4, reply=['guid', 'data'])) + {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4, 'layer': ['public']}, + call(cp, method='GET', document='report', guid=guid4, reply=['guid', 'data', 'layer'])) self.assertEqual([ - {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png'}, - {'guid': guid2, 'icon': 'http://foo/bar'}, - {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar'}, - {'guid': guid5, 'icon': ['http://localhost:8000/1', 'http://2']}, + {'guid': guid1, 'icon': 'http://localhost:8000/static/images/missing.png', 'layer': ['public']}, + {'guid': guid2, 'icon': 'http://foo/bar', 'layer': ['public']}, + {'guid': guid3, 'icon': 'http://localhost:8000/foo/bar', 'layer': ['public']}, + {'guid': guid5, 'icon': ['http://localhost:8000/1', 'http://2'], 'layer': ['public']}, ], - call(cp, method='GET', document='context', reply=['guid', 'icon'])['result']) + call(cp, method='GET', document='context', reply=['guid', 'icon', 'layer'])['result']) self.assertEqual([ - {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4}, + {'guid': guid4, 'data': 'http://localhost:8000/report/%s/data' % guid4, 'layer': ['public']}, ], - call(cp, method='GET', document='report', reply=['guid', 'data'])['result']) + call(cp, method='GET', document='report', reply=['guid', 'data', 'layer'])['result']) def test_DeletedDocuments(self): volume = Volume('db') diff --git a/tests/units/node_mount.py b/tests/units/node_mount.py index eed94f7..6e864a3 100755 --- a/tests/units/node_mount.py +++ b/tests/units/node_mount.py @@ -13,164 +13,149 @@ import active_document as ad from active_toolkit import coroutine, sockets from sugar_network.local.mounts import HomeMount from sugar_network.local.mountset import Mountset -from sugar_network.local.bus import IPCServer from sugar_network.local import activities from sugar_network.toolkit import mounts_monitor from sugar_network.resources.user import User from sugar_network.resources.context import Context -from sugar_network import local, Client, sugar +from sugar_network import local, sugar from sugar_network.resources.volume import Volume from sugar_network.resources.report import Report +from sugar_network.local.ipc_client import Router as IPCRouter +from sugar_network import IPCClient class NodeMountTest(tests.Test): def setUp(self): tests.Test.setUp(self) + self.events_job = None local.server_mode.value = True def tearDown(self): + if self.events_job is not None: + self.events_job.kill() tests.Test.tearDown(self) - def start_server(self, ipc=False): + def start_server(self): + self.touch('mnt/.sugar-network') + self.touch(('mnt/node', 'node')) local.mounts_root.value = tests.tmpdir volume = Volume('local', [User, Context, Report]) - mounts = Mountset(volume) - if ipc: - self.server = IPCServer(mounts) - coroutine.spawn(self.server.serve_forever) - coroutine.dispatch() - else: - Client._connection = mounts - self.got_event = coroutine.Event() - - def events_cb(event): - if event['event'] in ('mount', 'unmount') and \ - event['mountpoint'].startswith(local.mounts_root.value): - self.events.append((event['event'], event['mountpoint'])) - self.got_event.set() - - self.events = [] - Client.connect(events_cb) - - mounts.open() + self.mounts = Mountset(volume) + self.server = coroutine.WSGIServer( + ('localhost', local.ipc_port.value), IPCRouter(self.mounts)) + coroutine.spawn(self.server.serve_forever) + self.mounts.open() mounts_monitor.start(tests.tmpdir) - mounts.opened.wait() - # Let `open()` start processing spawned jobs - coroutine.dispatch() + self.mounts.opened.wait() - return mounts + return self.mounts def test_GetKeep(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) mounts = self.start_server() - self.got_event.wait() - - remote = Client(tests.tmpdir + '/mnt') + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') - guid = remote.Context( - type='activity', - title='remote', - summary='summary', - description='description').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - context = remote.Context(guid, ['keep', 'keep_impl']) + context = remote.get(['context', guid], reply=['keep', 'keep_impl']) self.assertEqual(False, context['keep']) self.assertEqual(0, context['keep_impl']) - self.assertEqual( - [(guid, False, False)], - [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])]) + self.assertEqual([ + {'guid': guid, 'keep': False, 'keep_impl': False}, + ], + remote.get(['context'], reply=['guid', 'keep', 'keep_impl'])['result']) mounts.home_volume['context'].create(guid=guid, type='activity', - title={'en': 'local'}, summary={'en': 'summary'}, - description={'en': 'description'}, keep=True, keep_impl=2, + title='local', summary='summary', + description='description', keep=True, keep_impl=2, user=[sugar.uid()]) + coroutine.sleep(1) - context = remote.Context(guid, ['keep', 'keep_impl']) + context = remote.get(['context', guid], reply=['keep', 'keep_impl']) self.assertEqual(True, context['keep']) self.assertEqual(2, context['keep_impl']) - self.assertEqual( - [(guid, True, 2)], - [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])]) + self.assertEqual([ + {'guid': guid, 'keep': True, 'keep_impl': 2}, + ], + remote.get(['context'], reply=['guid', 'keep', 'keep_impl'])['result']) def test_SetKeep(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() mounts['~'] = HomeMount(mounts.home_volume) - self.got_event.wait() - remote = Client(tests.tmpdir + '/mnt') - local = Client('~') - - guid_1 = remote.Context( - type=['activity'], - title='remote', - summary='summary', - description='description').post() - guid_2 = remote.Context( - type=['activity'], - title='remote-2', - summary='summary', - description='description').post() - - self.assertRaises(ad.NotFound, lambda: local.Context(guid_1, reply=['title'])['title']) - self.assertRaises(ad.NotFound, lambda: local.Context(guid_2, reply=['title'])['title']) - - remote.Context(guid_1, keep=True).post() - - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) + local = IPCClient(mountpoint='~') + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') + + guid_1 = remote.post(['context'], { + 'type': 'activity', + 'title': 'remote', + 'summary': 'summary', + 'description': 'description', + }) + guid_2 = remote.post(['context'], { + 'type': 'activity', + 'title': 'remote-2', + 'summary': 'summary', + 'description': 'description', + }) + + self.assertRaises(RuntimeError, local.get, ['context', guid_1]) + self.assertRaises(RuntimeError, local.get, ['context', guid_2]) + + remote.put(['context', guid_1], {'keep': True}) + self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) - remote.Context(guid_1, keep=False).post() + remote.put(['context', guid_1], {'keep': False}) - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) self.assertEqual( sorted([ - (guid_1, 'remote', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', False, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) - context = local.Context(guid_1) - context['title'] = 'local' - context.post() - context = local.Context(guid_1, reply=['keep', 'keep_impl', 'title']) - self.assertEqual('local', context['title']) + local.put(['context', guid_1], {'title': 'local'}) - remote.Context(guid_1, keep=True).post() + self.assertEqual( + {'title': 'local'}, + local.get(['context', guid_1], reply=['title'])) + + remote.put(['context', guid_1], {'keep': True}) - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) self.assertEqual( sorted([ - (guid_1, 'local', True, 0), + {'guid': guid_1, 'title': 'local', 'keep': True, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) def test_SetKeepImpl(self): Volume.RESOURCES = [ @@ -179,27 +164,28 @@ class NodeMountTest(tests.Test): 'sugar_network.resources.implementation', ] - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - mounts = self.start_server(ipc=True) + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() mounts['~'] = HomeMount(mounts.home_volume) - self.got_event.wait() - remote = Client(tests.tmpdir + '/mnt') - local = Client('~') + local = IPCClient(mountpoint='~') + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') coroutine.spawn(activities.monitor, mounts.home_volume['context'], ['Activities']) - context = remote.Context( - type=['activity'], - title='remote', - summary='summary', - description='description').post() - impl = remote.Implementation( - context=context, - license=['GPLv3+'], - version='1', - date=0, - stability='stable', - notes='').post() + context = remote.post(['context'], { + 'type': 'activity', + 'title': 'remote', + 'summary': 'summary', + 'description': 'description', + }) + impl = remote.post(['implementation'], { + 'context': context, + 'license': 'GPLv3+', + 'version': '1', + 'date': 0, + 'stability': 'stable', + 'notes': '', + }) + with file('mnt/context/%s/%s/feed' % (context[:2], context), 'w') as f: json.dump({ 'seqno': 0, @@ -233,125 +219,120 @@ class NodeMountTest(tests.Test): 'license=Public Domain', ])) bundle.close() - remote.Implementation(impl).upload_blob('data', 'bundle') + remote.put(['implementation', impl, 'data'], cmd='upload_blob', path=abspath('bundle')) - remote.Context(context, keep_impl=1).post() + remote.put(['context', context], {'keep_impl': 1}) + coroutine.sleep(1) - cursor = local.Context.cursor(reply=['keep_impl', 'title']) + cursor = local.get(['context'], reply=['guid', 'keep_impl', 'title'])['result'] self.assertEqual([ - (context, 'remote', 2), + {'guid': context, 'title': 'remote', 'keep_impl': 2}, ], - [(i.guid, i['title'], i['keep_impl']) for i in cursor]) + cursor) assert exists('Activities/TestActivitry/activity/activity.info') def test_Events(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - self.start_server() - self.got_event.wait() - self.got_event.clear() - client = Client(tests.tmpdir + '/mnt') - - def events_cb(event): - if 'props' in event: - event.pop('props') - events.append(event) - got_commit.set() - got_commit.clear() + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') events = [] - got_commit = coroutine.Event() - Client.connect(events_cb) - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - got_commit.wait() + got_event = coroutine.Event() + + def read_events(): + for event in remote.subscribe(): + if 'props' in event: + event.pop('props') + events.append(event) + got_event.set() + job = coroutine.spawn(read_events) + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + got_event.wait() + got_event.clear() self.assertEqual([ {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'create', 'guid': guid, 'seqno': 1}, - {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 1}, ], events) del events[:] - client.Context(guid, title='new-title').post() - got_commit.wait() + remote.put(['context', guid], {'title': 'new-title'}) + got_event.wait() + got_event.clear() self.assertEqual([ {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 2}, - {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 2}, ], events) del events[:] guid_path = 'mnt/context/%s/%s' % (guid[:2], guid) assert exists(guid_path) - client.Context.delete(guid) + remote.delete(['context', guid]) assert not exists(guid_path) - got_commit.wait() + got_event.wait() + got_event.clear() self.assertEqual([ {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'delete', 'guid': guid}, - {'mountpoint': tests.tmpdir + '/mnt', 'document': 'context', 'event': 'commit', 'seqno': 2}, ], events) del events[:] def test_upload_blob(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - self.start_server() - self.got_event.wait() - remote = Client(tests.tmpdir + '/mnt') + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') - guid = remote.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) self.touch(('file', 'blob')) - remote.Context(guid).upload_blob('preview', 'file') - self.assertEqual('blob', remote.Context(guid).get_blob('preview').read()) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob', file(blob['path']).read()) self.touch(('file2', 'blob2')) - remote.Context(guid).upload_blob('preview', 'file2', pass_ownership=True) - self.assertEqual('blob2', remote.Context(guid).get_blob('preview').read()) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob2', file(blob['path']).read()) assert not exists('file2') - def test_GetAbsetnBLOB(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - self.start_server() - self.got_event.wait() - client = Client(tests.tmpdir + '/mnt') + def test_GetAbsentBLOB(self): + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') - guid = client.Report( - context='context', - implementation='implementation', - description='description').post() + guid = remote.post(['report'], { + 'context': 'context', + 'implementation': 'implementation', + 'description': 'description', + }) - path, mime_type = client.Report(guid).get_blob_path('data') - self.assertEqual(None, path) - self.assertEqual(True, client.Report(guid).get_blob('data').closed) + self.assertEqual(None, remote.get(['report', guid, 'data'], cmd='get_blob')) def test_GetDefaultBLOB(self): - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - self.start_server() - self.got_event.wait() - client = Client(tests.tmpdir + '/mnt') + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - path, mime_type = client.Context(guid).get_blob_path('icon') - assert path.endswith('missing.png') - assert exists(path) - self.assertEqual(False, client.Context(guid).get_blob('icon').closed) + blob = remote.get(['context', guid, 'icon'], cmd='get_blob') + assert blob['path'].endswith('missing.png') + assert exists(blob['path']) def test_get_blob_ExtractImplementations(self): Volume.RESOURCES = [ @@ -359,28 +340,27 @@ class NodeMountTest(tests.Test): 'sugar_network.resources.implementation', ] - self.touch('mnt/.sugar-network') - self.touch(('mnt/node', 'node')) - self.start_server() - self.got_event.wait() - remote = Client(tests.tmpdir + '/mnt') - - guid = remote.Implementation( - context='context', - license=['GPLv3+'], - version='1', - date=0, - stability='stable', - notes='').post() + mounts = self.start_server() + mounts[tests.tmpdir + '/mnt'].mounted.wait() + remote = IPCClient(mountpoint=tests.tmpdir + '/mnt') + + guid = remote.post(['implementation'], { + 'context': 'context', + 'license': 'GPLv3+', + 'version': '1', + 'date': 0, + 'stability': 'stable', + 'notes': '', + }) bundle = zipfile.ZipFile('bundle', 'w') bundle.writestr('probe', 'probe') bundle.close() - remote.Implementation(guid).upload_blob('data', 'bundle') + remote.put(['implementation', guid, 'data'], cmd='upload_blob', path=abspath('bundle')) - path, __ = remote.Implementation(guid).get_blob_path('data') - self.assertEqual(abspath('cache/implementation/%s/%s/data' % (guid[:2], guid)), path) - self.assertEqual('probe', file(join(path, 'probe')).read()) + blob = remote.get(['implementation', guid, 'data'], cmd='get_blob') + self.assertEqual(abspath('cache/implementation/%s/%s/data' % (guid[:2], guid)), blob['path']) + self.assertEqual('probe', file(join(blob['path'], 'probe')).read()) if __name__ == '__main__': diff --git a/tests/units/remote_mount.py b/tests/units/remote_mount.py index b06e686..7ac5610 100755 --- a/tests/units/remote_mount.py +++ b/tests/units/remote_mount.py @@ -5,380 +5,348 @@ import os import json import socket from cStringIO import StringIO -from os.path import exists +from os.path import exists, abspath from __init__ import tests from active_toolkit import sockets, coroutine -from sugar_network import Client, ServerError +from sugar_network import local +from sugar_network.local.ipc_client import Router as IPCRouter from sugar_network.local.mounts import RemoteMount from sugar_network.local.mountset import Mountset -from sugar_network.local.bus import IPCServer from sugar_network.toolkit import sugar, http from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.resources.report import Report from sugar_network.resources.volume import Volume +from sugar_network import IPCClient class RemoteMountTest(tests.Test): def test_GetKeep(self): self.start_ipc_and_restful_server() + remote = IPCClient(mountpoint='/') - remote = Client('/') + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - guid = remote.Context( - type='activity', - title='remote', - summary='summary', - description='description').post() - - context = remote.Context(guid, ['keep', 'keep_impl']) + context = remote.get(['context', guid], reply=['keep', 'keep_impl']) self.assertEqual(False, context['keep']) self.assertEqual(0, context['keep_impl']) + cursor = remote.get(['context'], reply=['keep', 'keep_impl'])['result'] self.assertEqual( [(guid, False, False)], - [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])]) + [(i['guid'], i['keep'], i['keep_impl']) for i in cursor]) self.mounts.home_volume['context'].create(guid=guid, type='activity', title={'en': 'local'}, summary={'en': 'summary'}, description={'en': 'description'}, keep=True, keep_impl=2, user=[sugar.uid()]) - context = remote.Context(guid, ['keep', 'keep_impl']) + context = remote.get(['context', guid], reply=['keep', 'keep_impl']) self.assertEqual(True, context['keep']) self.assertEqual(2, context['keep_impl']) + cursor = remote.get(['context'], reply=['keep', 'keep_impl'])['result'] self.assertEqual( [(guid, True, 2)], - [(i['guid'], i['keep'], i['keep_impl']) for i in remote.Context.cursor(reply=['keep', 'keep_impl'])]) + [(i['guid'], i['keep'], i['keep_impl']) for i in cursor]) def test_SetKeep(self): self.start_ipc_and_restful_server() + remote = IPCClient(mountpoint='/') + local = IPCClient(mountpoint='~') + + guid_1 = remote.post(['context'], { + 'type': 'activity', + 'title': 'remote', + 'summary': 'summary', + 'description': 'description', + }) + guid_2 = remote.post(['context'], { + 'type': 'activity', + 'title': 'remote-2', + 'summary': 'summary', + 'description': 'description', + }) + + self.assertRaises(RuntimeError, local.get, ['context', guid_1]) + self.assertRaises(RuntimeError, local.get, ['context', guid_2]) + + remote.put(['context', guid_1], {'keep': True}) - remote = Client('/') - local = Client('~') - - guid_1 = remote.Context( - type=['activity'], - title='remote', - summary='summary', - description='description').post() - guid_2 = remote.Context( - type=['activity'], - title='remote-2', - summary='summary', - description='description').post() - - self.assertRaises(ServerError, lambda: local.Context(guid_1, reply=['title'])['title']) - self.assertRaises(ServerError, lambda: local.Context(guid_2, reply=['title'])['title']) - - remote.Context(guid_1, keep=True).post() - - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) - remote.Context(guid_1, keep=False).post() + remote.put(['context', guid_1], {'keep': False}) - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) self.assertEqual( sorted([ - (guid_1, 'remote', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', False, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': False, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) - context = local.Context(guid_1) - context['title'] = 'local' - context.post() - context = local.Context(guid_1, reply=['keep', 'keep_impl', 'title']) - self.assertEqual('local', context['title']) + local.put(['context', guid_1], {'title': 'local'}) - remote.Context(guid_1, keep=True).post() + self.assertEqual( + {'title': 'local'}, + local.get(['context', guid_1], reply=['title'])) + + remote.put(['context', guid_1], {'keep': True}) - cursor = local.Context.cursor(reply=['keep', 'keep_impl', 'title']) self.assertEqual( sorted([ - (guid_1, 'local', True, 0), + {'guid': guid_1, 'title': 'local', 'keep': True, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) - cursor = remote.Context.cursor(reply=['keep', 'keep_impl', 'title']) + sorted(local.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) self.assertEqual( sorted([ - (guid_1, 'remote', True, 0), - (guid_2, 'remote-2', False, 0), + {'guid': guid_1, 'title': 'remote', 'keep': True, 'keep_impl': 0}, + {'guid': guid_2, 'title': 'remote-2', 'keep': False, 'keep_impl': 0}, ]), - sorted([(i.guid, i['title'], i['keep'], i['keep_impl']) for i in cursor])) + sorted(remote.get(['context'], reply=['guid', 'title', 'keep', 'keep_impl'])['result'])) def test_Subscription(self): - self.fork(self.restful_server) - coroutine.sleep(1) - - self.start_server() - client = Client('/') - - subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX)) - subscription.connect('run/subscribe') - coroutine.sleep(1) - - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - self.assertEqual( - {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, - subscription.read_message()) - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('props') - event.pop('seqno') - self.assertEqual( - {'mountpoint': '/', 'document': 'context', 'event': 'create', 'guid': guid}, - event) - - client.Context(guid, title='new-title').post() - - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('props') - event.pop('seqno') - self.assertEqual( - {'mountpoint': '/', 'document': 'context', 'event': 'update', 'guid': guid}, - event) - - client.Context.delete(guid) - - coroutine.select([subscription.fileno()], [], []) - event = subscription.read_message() - event.pop('seqno') - self.assertEqual( - {'mountpoint': '/', 'document': 'context', 'event': 'delete', 'guid': guid}, - event) - - def test_Connect(self): + self.start_ipc_and_restful_server() + remote = IPCClient(mountpoint='/') + events = [] + + def read_events(): + for event in remote.subscribe(): + if 'props' in event: + event.pop('props') + events.append(event) + job = coroutine.spawn(read_events) + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + coroutine.dispatch() + remote.put(['context', guid], { + 'title': 'title_2', + }) + coroutine.dispatch() + remote.delete(['context', guid]) + coroutine.sleep(.5) + job.kill() + + self.assertEqual([ + {'guid': guid, 'seqno': 2, 'document': 'context', 'event': 'create', 'mountpoint': '/'}, + {'guid': guid, 'seqno': 3, 'document': 'context', 'event': 'update', 'mountpoint': '/'}, + {'guid': guid, 'seqno': 4, 'event': 'delete', 'document': 'context', 'mountpoint': '/'}, + ], + events) + + def test_Subscription_NotifyOnline(self): + self.start_ipc_and_restful_server() + remote = IPCClient(mountpoint='/') + local = IPCClient(mountpoint='~') + events = [] + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) + + def read_events(): + for event in remote.subscribe(): + if 'props' in event: + event.pop('props') + events.append(event) + + job = coroutine.spawn(read_events) + local.put(['context', guid], {'keep': False}) + coroutine.sleep(.5) + job.kill() + + self.assertEqual([ + {'document': 'context', 'event': 'update', 'guid': guid, 'seqno': 1}, + ], + events) + + def test_Mount(self): pid = self.fork(self.restful_server) - volume = Volume('local', [User, Context]) - mounts = Mountset(volume) - mounts['/'] = RemoteMount(volume) - ipc_server = IPCServer(mounts) - coroutine.spawn(ipc_server.serve_forever) - client = Client('/') - subscription = sockets.SocketFile(coroutine.socket(socket.AF_UNIX)) - subscription.connect('run/subscribe') - - mounts.open() + volume = Volume('local', [User, Context]) + self.mounts = Mountset(volume) + self.mounts['/'] = RemoteMount(volume) + self.server = coroutine.WSGIServer( + ('localhost', local.ipc_port.value), IPCRouter(self.mounts)) + coroutine.spawn(self.server.serve_forever) coroutine.dispatch() + remote = IPCClient(mountpoint='/') + + events = [] + def read_events(): + for event in remote.subscribe(): + if 'props' in event: + event.pop('props') + events.append(event) + job = coroutine.spawn(read_events) + + self.assertEqual(False, remote.get(cmd='mounted')) + self.mounts.open() + self.mounts['/'].mounted.wait() + coroutine.sleep(1) - coroutine.select([subscription.fileno()], [], []) - self.assertEqual( - {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, - subscription.read_message()) - self.assertEqual(True, client.connected) + self.assertEqual(True, remote.get(cmd='mounted')) + self.assertEqual([ + {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, + ], + events) + del events[:] self.waitpid(pid) + coroutine.sleep(1) - coroutine.select([subscription.fileno()], [], []) - self.assertEqual( - {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False}, - subscription.read_message()) - self.assertEqual(False, client.connected) + self.assertEqual(False, remote.get(cmd='mounted')) + self.assertEqual([ + {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False}, + ], + events) + del events[:] pid = self.fork(self.restful_server) + # Ping to trigger re-connection + self.assertEqual(False, remote.get(cmd='mounted')) coroutine.sleep(1) - self.assertEqual(False, client.connected) - coroutine.select([subscription.fileno()], [], []) - self.assertEqual( - {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, - subscription.read_message()) - self.assertEqual(True, client.connected) - - self.waitpid(pid) - - coroutine.select([subscription.fileno()], [], []) - self.assertEqual( - {'mountpoint': '/', 'event': 'unmount', 'name': 'Network', 'private': False}, - subscription.read_message()) - self.assertEqual(False, client.connected) + self.assertEqual(True, remote.get(cmd='mounted')) + self.assertEqual([ + {'mountpoint': '/', 'event': 'mount', 'name': 'Network', 'private': False}, + ], + events) + del events[:] def test_upload_blob(self): self.start_ipc_and_restful_server() - remote = Client('/') + remote = IPCClient(mountpoint='/') - guid = remote.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) self.touch(('file', 'blob')) - remote.Context(guid).upload_blob('preview', 'file') - self.assertEqual('blob', remote.Context(guid).get_blob('preview').read()) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob', file(blob['path']).read()) self.touch(('file2', 'blob2')) - remote.Context(guid).upload_blob('preview', 'file2', pass_ownership=True) - self.assertEqual('blob2', remote.Context(guid).get_blob('preview').read()) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file2'), pass_ownership=True) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob2', file(blob['path']).read()) assert not exists('file2') - def test_StaleBLOBs(self): - self.start_ipc_and_restful_server() - remote = Client('/') - - guid = remote.Context( - type='activity', - title='title', - summary='summary', - description='description').post() - - http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob-1')}) - self.assertEqual('blob-1', remote.Context(guid).get_blob('preview').read()) - - cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid) - self.touch((cache_path, 'blob-2')) - self.assertEqual('blob-2', remote.Context(guid).get_blob('preview').read()) - self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno']) - - http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob-3')}) - self.assertEqual('blob-3', remote.Context(guid).get_blob('preview').read()) - self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno']) - - def test_DoNotStaleBLOBs(self): - self.start_ipc_and_restful_server() - remote = Client('/') - - guid = http.request('POST', ['context'], - headers={'Content-Type': 'application/json'}, - data={ - 'type': 'activity', - 'title': 'title', - 'summary': 'summary', - 'description': 'description', - }) - - http.request('PUT', ['context', guid, 'preview'], files={'file': StringIO('blob')}) - self.assertEqual('blob', remote.Context(guid).get_blob('preview').read()) - - cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid) - self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno']) - - # Shift seqno - connected = coroutine.Event() - self.mounts.connect(lambda event: connected.set(), event='create', mountpoint='/') - http.request('POST', ['context'], - headers={'Content-Type': 'application/json'}, - data={ - 'type': 'activity', - 'title': 'title2', - 'summary': 'summary2', - 'description': 'description2', - }) - connected.wait() - - self.assertEqual('blob', remote.Context(guid).get_blob('preview').read()) - self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno']) - def test_GetAbsentBLOB(self): self.start_ipc_and_restful_server([User, Report]) - client = Client('/') + remote = IPCClient(mountpoint='/') - guid = client.Report( - context='context', - implementation='implementation', - description='description').post() + guid = remote.post(['report'], { + 'context': 'context', + 'implementation': 'implementation', + 'description': 'description', + }) - path, mime_type = client.Report(guid).get_blob_path('data') - self.assertEqual(None, path) - self.assertEqual(True, client.Report(guid).get_blob('data').closed) + self.assertEqual(None, remote.get(['report', guid, 'data'], cmd='get_blob')) def test_GetDefaultBLOB(self): self.start_ipc_and_restful_server() - client = Client('/') + remote = IPCClient(mountpoint='/') - guid = client.Context( - type='activity', - title='title', - summary='summary', - description='description').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - path, mime_type = client.Context(guid).get_blob_path('icon') - assert exists(path) - self.assertEqual(False, client.Context(guid).get_blob('icon').closed) + blob = remote.get(['context', guid, 'icon'], cmd='get_blob') + assert not blob['path'].endswith('missing.png') + assert exists(blob['path']) + assert file(blob['path'], 'rb').read() == file('../../../sugar_network/static/images/missing.png', 'rb').read() - def test_Localize(self): - os.environ['LANG'] = 'en_US' + def test_StaleBLOBs(self): self.start_ipc_and_restful_server() - client = Client('/') + remote = IPCClient(mountpoint='/') - guid = client.Context( - type='activity', - title='title_en', - summary='summary_en', - description='description_en').post() + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) + self.touch(('file', 'blob-1')) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob-1', file(blob['path']).read()) - self.stop_servers() - os.environ['LANG'] = 'ru_RU' - self.start_ipc_and_restful_server() - client = Client('/') + cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid) + self.touch((cache_path, 'blob-2')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob-2', file(blob['path']).read()) + self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno']) - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) + self.touch(('file', 'blob-3')) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob-3', file(blob['path']).read()) + self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno']) - res['title'] = 'title_ru' - res['summary'] = 'summary_ru' - res['description'] = 'description_ru' - res.post() + def test_DoNotStaleBLOBs(self): + self.start_ipc_and_restful_server() + remote = IPCClient(mountpoint='/') - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_ru', res['title']) - self.assertEqual('summary_ru', res['summary']) - self.assertEqual('description_ru', res['description']) + guid = remote.post(['context'], { + 'type': 'activity', + 'title': 'title', + 'summary': 'summary', + 'description': 'description', + }) - self.stop_servers() - os.environ['LANG'] = 'es_ES' - self.start_ipc_and_restful_server() - client = Client('/') + self.touch(('file', 'blob-1')) + remote.put(['context', guid, 'preview'], cmd='upload_blob', path=abspath('file')) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob-1', file(blob['path']).read()) - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_en', res['title']) - self.assertEqual('summary_en', res['summary']) - self.assertEqual('description_en', res['description']) + cache_path = 'cache/context/%s/%s/preview' % (guid[:2], guid) + self.touch((cache_path, 'blob-2')) + self.assertEqual(3, json.load(file(cache_path + '.meta'))['seqno']) - self.stop_servers() - os.environ['LANG'] = 'ru_RU' - self.start_ipc_and_restful_server() - client = Client('/') + # Shift seqno + remote.put(['context', guid], {'title': 'title-2'}) + coroutine.sleep(1) - res = client.Context(guid, ['title', 'summary', 'description']) - self.assertEqual('title_ru', res['title']) - self.assertEqual('summary_ru', res['summary']) - self.assertEqual('description_ru', res['description']) + blob = remote.get(['context', guid, 'preview'], cmd='get_blob') + self.assertEqual('blob-2', file(blob['path']).read()) + self.assertEqual(4, json.load(file(cache_path + '.meta'))['seqno']) if __name__ == '__main__': diff --git a/tests/units/router.py b/tests/units/router.py index b2daef8..e9908c2 100755 --- a/tests/units/router.py +++ b/tests/units/router.py @@ -13,7 +13,7 @@ from __init__ import tests import active_document as ad from sugar_network import node -from sugar_network.node.router import Router, _Request, _parse_accept_language +from sugar_network.toolkit.router import Router, _Request, _parse_accept_language, Unauthorized from active_toolkit import util from sugar_network.resources.volume import Volume @@ -203,11 +203,11 @@ class RouterTest(tests.Test): 'PATH_INFO': '/foo', 'REQUEST_METHOD': 'GET', }) - self.assertRaises(node.Unauthorized, router._authenticate, request) + self.assertRaises(Unauthorized, router.authenticate, request) request.environ['HTTP_SUGAR_USER'] = tests.UID request.environ['HTTP_SUGAR_USER_SIGNATURE'] = tests.sign(tests.PRIVKEY, tests.UID) - user = router._authenticate(request) + user = router.authenticate(request) self.assertEqual(tests.UID, user) def test_Authorization(self): @@ -226,8 +226,8 @@ class RouterTest(tests.Test): guid = rest.post('///document//', {'term': 'probe'}) self.assertEqual( - {'term': 'probe'}, - rest.get('///document///%s///' % guid, reply='term')) + 'probe', + rest.get('///document///%s///' % guid, reply='term').get('term')) def test_HandleRedirects(self): URL = 'http://sugarlabs.org' @@ -271,6 +271,15 @@ class RouterTest(tests.Test): class Document(ad.Document): + @ad.active_property(prefix='RU', typecast=[], default=[], + permissions=ad.ACCESS_CREATE | ad.ACCESS_READ) + def user(self, value): + return value + + @ad.active_property(prefix='L', typecast=[], default=['public']) + def layer(self, value): + return value + @ad.active_property(slot=1, prefix='A', full_text=True) def term(self, value): return value @@ -294,6 +303,10 @@ class Document(ad.Document): class User(ad.Document): + @ad.active_property(prefix='L', typecast=[], default=['public']) + def layer(self, value): + return value + @ad.active_property(ad.StoredProperty) def pubkey(self, value): return value diff --git a/tests/units/subscribe_socket.py b/tests/units/subscribe_socket.py deleted file mode 100755 index f63329b..0000000 --- a/tests/units/subscribe_socket.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import json -import hashlib -import tempfile - -from gevent import socket - -from __init__ import tests - -import active_document as ad -from sugar_network import node -from active_toolkit import sockets, coroutine, util - - -class SubscribeSocketTest(tests.Test): - - def subscribe(self, host, port, ticket): - conn = coroutine.socket() - conn.connect((host, port)) - result = sockets.SocketFile(conn) - result.write_message({'ticket': ticket}) - coroutine.sleep(1) - return result - - def test_Subscribe(self): - node.only_commit_events.value = False - - self.fork(self.restful_server, [User, Document]) - rest = tests.Request('http://localhost:8800') - - with self.subscribe(**rest.post('/', None, cmd='subscribe')) as subscription: - guid = rest.post('/document', {'prop': 'value'}) - self.assertEqual( - {'layer': ['public']}, - rest.get('/document/' + guid, reply=['layer'])) - - event = subscription.read_message() - event.pop('props') - event.pop('seqno') - self.assertEqual({ - 'guid': guid, - 'document': 'document', - 'event': 'create', - }, - event) - - rest.put('/document/' + guid, {'prop': 'value2'}) - self.assertEqual( - {'prop': 'value2'}, - rest.get('/document/' + guid, reply=['prop'])) - - event = subscription.read_message() - event.pop('props') - event.pop('seqno') - self.assertEqual({ - 'event': 'update', - 'document': 'document', - 'guid': guid, - }, - event) - - rest.delete('/document/' + guid) - self.assertRaises(RuntimeError, rest.get, '/document/' + guid) - event = subscription.read_message() - event.pop('seqno') - self.assertEqual({ - 'event': 'delete', - 'document': 'document', - 'guid': guid, - }, - event) - - self.assertRaises(socket.timeout, socket.wait_read, subscription.fileno(), 1) - - def test_OnlySyncEvents(self): - node.only_commit_events.value = True - - self.fork(self.restful_server, [User, Document]) - rest = tests.Request('http://localhost:8800') - - with self.subscribe(**rest.post('/', None, cmd='subscribe')) as subscription: - guid = rest.post('/document', {'prop': 'value'}) - - self.assertEqual({ - 'document': 'document', - 'event': 'commit', - 'seqno': 2, - }, - subscription.read_message()) - - rest.put('/document/' + guid, {'prop': 'value2'}) - - self.assertEqual({ - 'document': 'document', - 'event': 'commit', - 'seqno': 3, - }, - subscription.read_message()) - - rest.delete('/document/' + guid) - - self.assertEqual({ - 'document': 'document', - 'event': 'commit', - 'seqno': 4, - }, - subscription.read_message()) - - self.assertRaises(socket.timeout, socket.wait_read, subscription.fileno(), 1) - - -class Document(ad.Document): - - @ad.active_property(slot=1, prefix='A', default='') - def prop(self, value): - return value - - @ad.active_property(ad.StoredProperty, default='') - def author(self, value): - return value - - -class User(ad.Document): - - @ad.active_property(ad.StoredProperty) - def pubkey(self, value): - return value - - @ad.active_property(ad.StoredProperty, default='') - def name(self, value): - return value - - @classmethod - def before_create(cls, props): - ssh_pubkey = props['pubkey'].split()[1] - props['guid'] = str(hashlib.sha1(ssh_pubkey).hexdigest()) - - with tempfile.NamedTemporaryFile() as tmp_pubkey: - tmp_pubkey.file.write(props['pubkey']) - tmp_pubkey.file.flush() - - pubkey_pkcs8 = util.assert_call( - ['ssh-keygen', '-f', tmp_pubkey.name, '-e', '-m', 'PKCS8']) - props['pubkey'] = pubkey_pkcs8 - - super(User, cls).before_create(props) - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py index ea2e6fb..ab24292 100755 --- a/tests/units/sync_master.py +++ b/tests/units/sync_master.py @@ -315,16 +315,12 @@ class SyncMasterTest(tests.Test): self.assertEqual(None, packet.header.get('dst')) self.assertEqual([ {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, }}, {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime}, 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, @@ -410,8 +406,6 @@ class SyncMasterTest(tests.Test): self.assertEqual(None, packet.header.get('dst')) self.assertEqual([ {'cookie': {'sn_pull': [[2, None]]}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, @@ -469,16 +463,12 @@ class SyncMasterTest(tests.Test): self.assertEqual(None, packet.header.get('dst')) self.assertEqual([ {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, 'prop': {'value': '*' * CHUNK, 'mtime': os.stat('db/document/1/1/prop').st_mtime}, }}, {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime}, 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, @@ -525,8 +515,6 @@ class SyncMasterTest(tests.Test): self.assertEqual(None, packet.header.get('dst')) self.assertEqual([ {'cookie': {}, 'filename': 'master-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, @@ -605,16 +593,12 @@ class SyncMasterTest(tests.Test): self.assertEqual(None, packet.header.get('dst')) self.assertEqual([ {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, 'prop': {'value': '', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, }}, {'cookie': {}, 'filename': 'master-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'master', 'document': 'document', 'guid': '2', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime}, 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py index 1e56158..93bdd82 100755 --- a/tests/units/sync_node.py +++ b/tests/units/sync_node.py @@ -40,16 +40,12 @@ class SyncNodeTest(tests.Test): self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-2.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': '1', 'sequence': [[1, None]]}, {'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, 'prop': {'value': 'value1', 'mtime': os.stat('db/document/1/1/prop').st_mtime}, }}, {'api_url': api_url.value, 'filename': 'node-2.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': '1', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime}, 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, @@ -67,8 +63,6 @@ class SyncNodeTest(tests.Test): node.sync('mnt', session='session') self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 'session', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, @@ -101,8 +95,6 @@ class SyncNodeTest(tests.Test): self.assertEqual([[2, None]], kwargs['diff_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, @@ -116,16 +108,12 @@ class SyncNodeTest(tests.Test): self.assertEqual([[4, None]], kwargs['diff_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/2/2/layer').st_mtime}, 'guid': {'value': '2', 'mtime': os.stat('db/document/2/2/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/2/2/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/2/2/mtime').st_mtime}, 'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/2/2/prop').st_mtime}, }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '3', 'session': 2, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/3/3/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/3/3/layer').st_mtime}, 'guid': {'value': '3', 'mtime': os.stat('db/document/3/3/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/3/3/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/3/3/mtime').st_mtime}, @@ -139,24 +127,18 @@ class SyncNodeTest(tests.Test): self.assertEqual(None, kwargs) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/4/4/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/4/4/layer').st_mtime}, 'guid': {'value': '4', 'mtime': os.stat('db/document/4/4/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/4/4/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/4/4/mtime').st_mtime}, 'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/4/4/prop').st_mtime}, }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '5', 'session': 3, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/5/5/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/5/5/layer').st_mtime}, 'guid': {'value': '5', 'mtime': os.stat('db/document/5/5/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/5/5/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/5/5/mtime').st_mtime}, 'prop': {'value': '*' * 1024, 'mtime': os.stat('db/document/5/5/prop').st_mtime}, }}, {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '6', 'session': 3, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/6/6/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/6/6/layer').st_mtime}, 'guid': {'value': '6', 'mtime': os.stat('db/document/6/6/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/6/6/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/6/6/mtime').st_mtime}, @@ -226,8 +208,6 @@ class SyncNodeTest(tests.Test): self.assertEqual([ {'filename': 'master.packet', 'content_type': 'records', 'src': 'master', 'dst': 'node', 'cmd': 'sn_ack', 'sequence': [[1, 2]], 'merged': []}, {'api_url': api_url.value, 'filename': 'node-4.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '3', 'session': 'session', 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/3/3/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/3/3/layer').st_mtime}, 'guid': {'value': '3', 'mtime': os.stat('db/document/3/3/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/3/3/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/3/3/mtime').st_mtime}, @@ -329,8 +309,6 @@ class SyncNodeTest(tests.Test): self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-1.packet', 'cmd': 'sn_pull', 'src': 'node', 'dst': 'master', 'session': session, 'sequence': [[1, None]]}, {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': session, 'diff': { - 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, - 'layer': {'value': ['public'], 'mtime': os.stat('db/document/1/1/layer').st_mtime}, 'guid': {'value': '1', 'mtime': os.stat('db/document/1/1/guid').st_mtime}, 'ctime': {'value': 0, 'mtime': os.stat('db/document/1/1/ctime').st_mtime}, 'mtime': {'value': 0, 'mtime': os.stat('db/document/1/1/mtime').st_mtime}, diff --git a/tests/units/volume.py b/tests/units/volume.py index a635b82..1642fb9 100755 --- a/tests/units/volume.py +++ b/tests/units/volume.py @@ -1,12 +1,15 @@ #!/usr/bin/env python # sugar-lint: disable +import json + from __init__ import tests import active_document as ad from sugar_network.toolkit.collection import Sequence from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, DiskFull -from sugar_network.resources.volume import Volume, Resource +from sugar_network.resources.volume import Volume, Resource, Commands +from active_toolkit import coroutine class VolumeTest(tests.Test): @@ -160,6 +163,96 @@ class VolumeTest(tests.Test): ], events) + def test_Subscribe(self): + + class Document(Resource): + + @ad.active_property(slot=1) + def prop(self, value): + return value + + volume = Volume('db', [Document]) + cp = TestCommands(volume) + events = [] + + def read_events(): + for event in cp.subscribe(ad.Response()): + assert event.startswith('data: ') + assert event.endswith('\n\n') + event = json.loads(event[6:]) + if 'props' in event: + event.pop('props') + events.append(event) + + job = coroutine.spawn(read_events) + coroutine.dispatch() + volume['document'].create(guid='guid', prop='value1') + coroutine.dispatch() + volume['document'].update('guid', prop='value2') + coroutine.dispatch() + volume['document'].delete('guid') + coroutine.dispatch() + volume['document'].commit() + coroutine.sleep(.5) + job.kill() + + self.assertEqual([ + {'guid': 'guid', 'seqno': 1, 'document': 'document', 'event': 'create'}, + {'guid': 'guid', 'seqno': 2, 'document': 'document', 'event': 'update'}, + {'guid': 'guid', 'event': 'delete', 'document': u'document'}, + ], + events) + + def test_SubscribeToOnlyCommits(self): + + class Document(Resource): + + @ad.active_property(slot=1) + def prop(self, value): + return value + + volume = Volume('db', [Document]) + cp = TestCommands(volume) + events = [] + + def read_events(): + for event in cp.subscribe(ad.Response(), only_commits=True): + assert event.startswith('data: ') + assert event.endswith('\n\n') + event = json.loads(event[6:]) + if 'props' in event: + event.pop('props') + events.append(event) + + job = coroutine.spawn(read_events) + coroutine.dispatch() + volume['document'].create(guid='guid', prop='value1') + coroutine.dispatch() + volume['document'].update('guid', prop='value2') + coroutine.dispatch() + volume['document'].delete('guid') + coroutine.dispatch() + volume['document'].commit() + coroutine.sleep(.5) + job.kill() + + self.assertEqual([ + {'seqno': 1, 'document': 'document', 'event': 'commit'}, + {'seqno': 2, 'document': 'document', 'event': 'commit'}, + {'seqno': 2, 'document': 'document', 'event': 'commit'}, + ], + events) + + +class TestCommands(ad.VolumeCommands, Commands): + + def __init__(self, volume): + ad.VolumeCommands.__init__(self, volume) + Commands.__init__(self) + + def connect(self, callback, condition=None, **kwargs): + self.volume.connect(callback, condition) + def read_packet(packet): result = [] |